RtpOneRoadThread.cpp 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. #include "RtpOneRoadThread.h"
  2. RTPOneRoadThread::RTPOneRoadThread(RecordThreadInfo_t& threadInfo)
  3. : BaseRecordThread(threadInfo),
  4. m_eventLoop()
  5. {
  6. m_logger = spdlog::get("RTPServer");
  7. if(m_logger == nullptr)
  8. {
  9. fmt::print("RTPServer 日志记录器未初始化,请先初始化日志记录器");
  10. return;
  11. }
  12. m_logBase = fmt::format("录音通道: {}:{}", m_threadInfo.cardRoadInfo.strSoundCardName.toStdString(),
  13. m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  14. m_isRunning.store(true); // 设置为运行状态
  15. }
  16. RTPOneRoadThread::~RTPOneRoadThread()
  17. {
  18. }
  19. /**
  20. * @brief 停止线程
  21. */
  22. void RTPOneRoadThread::stopThread()
  23. {
  24. if(m_sendTimer.isActive())
  25. {
  26. m_sendTimer.stop();
  27. }
  28. while(m_eventLoop.isRunning())
  29. {
  30. m_eventLoop.quit();
  31. m_eventLoop.processEvents(QEventLoop::AllEvents, 10);
  32. }
  33. }
  34. /* 设置数据 */
  35. bool RTPOneRoadThread::setData(const AudioSrcData& srcData)
  36. {
  37. /* UDP会话列表是空的,就不接收数据了 */
  38. if(!m_isRecvData.load())
  39. {
  40. return true;
  41. }
  42. /* 如果数据列表1被占用,就写入数据列表2 */
  43. AudioSrcData* data = new AudioSrcData(srcData);
  44. if(data == nullptr)
  45. {
  46. return false;
  47. }
  48. m_ringQueue.push(data);
  49. return false;
  50. }
  51. /* 添加一个UDP会话 */
  52. bool RTPOneRoadThread::addUdpSession(const RtpSendClientInfo_t& udpSession)
  53. {
  54. /* 检查是否已有相同的会话,这里只需要检查IP和端口即可,会话ID和名称在这里没什么用 */
  55. for(const auto& session : m_listUdpSockets)
  56. {
  57. if(session.clientIP == udpSession.clientIP && session.clientPort == udpSession.clientPort)
  58. {
  59. SPDLOG_LOGGER_DEBUG(m_logger, "{} 已存在相同的UDP会话: {}:{}", m_logBase, udpSession.clientIP.toStdString(), udpSession.clientPort);
  60. return true;
  61. }
  62. }
  63. /* 创建新的UDP套接字 */
  64. QUdpSocket* udpSocket = new QUdpSocket();
  65. /* 绑定本地IP和端口 */
  66. if(!udpSocket->bind(QHostAddress::Any, udpSession.localPort))
  67. {
  68. SPDLOG_LOGGER_ERROR(m_logger, "{} 创建UDP套接字失败: {}:{}", m_logBase, udpSession.clientIP.toStdString(), udpSession.clientPort);
  69. delete udpSocket;
  70. return false;
  71. }
  72. SPDLOG_LOGGER_INFO(m_logger, "{} 添加UDP会话: {}:{}", m_logBase, udpSession.clientIP.toStdString(), udpSession.clientPort);
  73. // 将新的会话添加到列表中
  74. RtpSendClientInfo_t newSession = udpSession;
  75. newSession.udpSocket = udpSocket;
  76. m_lockUdpSockets.lock();
  77. m_listUdpSockets.append(newSession);
  78. m_lockUdpSockets.unlock();
  79. m_isRecvData.store(true); // 设置为接收数据状态
  80. return true;
  81. }
  82. /* 删除一个会话 */
  83. bool RTPOneRoadThread::removeUdpSession(const RtpSendClientInfo_t& udpSession)
  84. {
  85. std::lock_guard<QMutex> lock(m_lockUdpSockets);
  86. for(auto it = m_listUdpSockets.begin(); it != m_listUdpSockets.end(); ++it)
  87. {
  88. if(it->clientIP == udpSession.clientIP && it->clientPort == udpSession.clientPort)
  89. {
  90. if(it->udpSocket != nullptr)
  91. {
  92. it->udpSocket->close();
  93. delete it->udpSocket;
  94. it->udpSocket = nullptr;
  95. }
  96. SPDLOG_LOGGER_INFO(m_logger, "{} 删除UDP会话: {}:{}", m_logBase, it->clientIP.toStdString(), it->clientPort);
  97. // 从列表中删除该会话
  98. m_listUdpSockets.erase(it);
  99. break;
  100. }
  101. }
  102. /* 如果列表为空,就不再接收数据 */
  103. if(m_listUdpSockets.isEmpty())
  104. {
  105. m_isRecvData.store(false);
  106. SPDLOG_LOGGER_DEBUG(m_logger, "{} UDP会话列表为空,不再接收音频原始数据", m_logBase);
  107. /* 清空环形队列 */
  108. while(m_ringQueue.QueueSize() > 0)
  109. {
  110. auto data = m_ringQueue.front_pop_noBlock();
  111. if(data != nullptr)
  112. {
  113. delete data;
  114. data = nullptr;
  115. }
  116. }
  117. }
  118. return true;
  119. }
  120. /* 发送数据的线程函数 */
  121. void RTPOneRoadThread::task()
  122. {
  123. SPDLOG_LOGGER_INFO(m_logger, "------------- {} 开启RTP发送数据线程 -------------", m_logBase);
  124. if(!initData())
  125. {
  126. SPDLOG_LOGGER_ERROR(m_logger, "{} 初始化数据失败", m_logBase);
  127. return;
  128. }
  129. m_sendTimer.setTimerType(Qt::PreciseTimer);
  130. m_sendTimer.setSingleShot(false); // 设置为循环定时器
  131. m_sendTimer.setInterval(10); // 每10毫秒发送一次数据
  132. QEventLoop::connect(&m_sendTimer, &QTimer::timeout, this, &RTPOneRoadThread::do_timerSendData);
  133. m_sendTimer.start();
  134. /* 将线程控制权交给Qt的事件循环 */
  135. m_eventLoop.exec();
  136. /* 清空数据 */
  137. clearData();
  138. SPDLOG_LOGGER_INFO(m_logger, "------------- {} RTP发送数据线程结束 -------------", m_logBase);
  139. }
  140. /* 初始化数据 */
  141. bool RTPOneRoadThread::initData()
  142. {
  143. m_listUdpSockets.clear();
  144. m_isRecvData.store(false);
  145. m_ringQueue.setQueueCapacity(512);
  146. m_ringQueue.setDefaultValue(nullptr);
  147. return true;
  148. }
  149. /* 清除数据*/
  150. void RTPOneRoadThread::clearData()
  151. {
  152. m_isRecvData.store(false); // 设置为不接收数据状态
  153. /* 清空UDP会话列表 */
  154. std::lock_guard<QMutex> lock(m_lockUdpSockets);
  155. for(auto& session : m_listUdpSockets)
  156. {
  157. if(session.udpSocket != nullptr)
  158. {
  159. session.udpSocket->close();
  160. delete session.udpSocket;
  161. session.udpSocket = nullptr;
  162. }
  163. }
  164. m_listUdpSockets.clear();
  165. /* 清空环形队列 */
  166. while(m_ringQueue.QueueSize() > 0)
  167. {
  168. auto data = m_ringQueue.front_pop_noBlock();
  169. if(data != nullptr)
  170. {
  171. delete data;
  172. data = nullptr;
  173. }
  174. }
  175. }
  176. /**
  177. * @brief 发送数据,先获取一个缓冲区的数据,获取到后就遍历所有的UDP会话,将这个缓冲区所有的数据都发送出去
  178. * 然后紧接着获取第二个缓冲区的数据,也一起发送出去
  179. *
  180. * @return true
  181. * @return false
  182. */
  183. bool RTPOneRoadThread::sendData()
  184. {
  185. while(m_ringQueue.QueueSize() > 0 && m_isRunning.load())
  186. {
  187. auto data = m_ringQueue.front_pop_noBlock();
  188. if(data == nullptr)
  189. {
  190. continue;
  191. }
  192. if(data->isEmpty())
  193. {
  194. delete data;
  195. continue;
  196. }
  197. std::lock_guard<QMutex> lock(m_lockUdpSockets);
  198. /* 遍历所有的UDP会话,发送数据 */
  199. for(const auto& session : m_listUdpSockets)
  200. {
  201. if(session.udpSocket == nullptr || !session.udpSocket->isValid())
  202. {
  203. SPDLOG_LOGGER_WARN(m_logger, "{} 无效的UDP套接字: {}:{}", m_logBase, session.clientIP.toStdString(), session.clientPort);
  204. continue;
  205. }
  206. qint64 bytesSent = session.udpSocket->writeDatagram(data->pData, data->dataSize,
  207. QHostAddress(session.clientIP), session.clientPort);
  208. if(bytesSent == -1)
  209. {
  210. SPDLOG_LOGGER_ERROR(m_logger, "{} 发送数据失败: {}:{}", m_logBase, session.clientIP.toStdString(), session.clientPort);
  211. }
  212. else
  213. {
  214. SPDLOG_LOGGER_DEBUG(m_logger, "{} 发送数据成功: {}:{} 大小: {}", m_logBase, session.clientIP.toStdString(), session.clientPort, bytesSent);
  215. }
  216. }
  217. /* 发送完数据后,删除这个数据 */
  218. delete data;
  219. data = nullptr;
  220. }
  221. return true;
  222. }
  223. /* 定时发送数据 */
  224. void RTPOneRoadThread::do_timerSendData()
  225. {
  226. if(!m_isRecvData)
  227. {
  228. return;
  229. }
  230. sendData();
  231. }