RtpOneRoadThread.cpp 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. #include "RtpOneRoadThread.h"
  2. #include "spdlog.h"
  3. RTPOneRoadThread::RTPOneRoadThread(RecordThreadInfo_t& threadInfo)
  4. : BaseRecordThread(threadInfo),
  5. m_eventLoop()
  6. {
  7. m_logger = spdlog::get("RTPServer");
  8. if(m_logger == nullptr)
  9. {
  10. fmt::print("RTPServer 日志记录器未初始化,请先初始化日志记录器");
  11. return;
  12. }
  13. m_logBase = fmt::format("录音通道: {}:{}", m_threadInfo.cardRoadInfo.strSoundCardName.toStdString(),
  14. m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  15. m_isRunning.store(true); // 设置为运行状态
  16. }
  17. RTPOneRoadThread::~RTPOneRoadThread()
  18. {
  19. }
  20. /**
  21. * @brief 停止线程
  22. */
  23. void RTPOneRoadThread::stopThread()
  24. {
  25. if(m_sendTimer.isActive())
  26. {
  27. m_sendTimer.stop();
  28. }
  29. while(m_eventLoop.isRunning())
  30. {
  31. m_eventLoop.quit();
  32. m_eventLoop.processEvents(QEventLoop::AllEvents, 10);
  33. }
  34. }
  35. /* 设置数据 */
  36. bool RTPOneRoadThread::setData(const AudioSrcData& srcData)
  37. {
  38. /* UDP会话列表是空的,就不接收数据了 */
  39. if(!m_isRecvData.load())
  40. {
  41. return true;
  42. }
  43. /* 如果数据列表1被占用,就写入数据列表2 */
  44. AudioSrcData* data = new AudioSrcData(srcData);
  45. if(data == nullptr)
  46. {
  47. return false;
  48. }
  49. m_ringQueue.push(data);
  50. return false;
  51. }
  52. /* 添加一个UDP会话 */
  53. bool RTPOneRoadThread::addUdpSession(const RtpSendClientInfo_t& udpSession)
  54. {
  55. /* 检查是否已有相同的会话,这里只需要检查IP和端口即可,会话ID和名称在这里没什么用 */
  56. for(const auto& session : m_listUdpSockets)
  57. {
  58. if(session.clientIP == udpSession.clientIP && session.clientPort == udpSession.clientPort)
  59. {
  60. SPDLOG_LOGGER_DEBUG(m_logger, "{} 已存在相同的UDP会话: {}:{}", m_logBase, udpSession.clientIP.toStdString(), udpSession.clientPort);
  61. return true;
  62. }
  63. }
  64. /* 创建新的UDP套接字 */
  65. QUdpSocket* udpSocket = new QUdpSocket();
  66. /* 绑定本地IP和端口 */
  67. if(!udpSocket->bind(QHostAddress::Any, udpSession.localPort))
  68. {
  69. SPDLOG_LOGGER_ERROR(m_logger, "{} 创建UDP套接字失败: {}:{}", m_logBase, udpSession.clientIP.toStdString(), udpSession.clientPort);
  70. delete udpSocket;
  71. return false;
  72. }
  73. SPDLOG_LOGGER_INFO(m_logger, "{} 添加UDP会话: {}:{}", m_logBase, udpSession.clientIP.toStdString(), udpSession.clientPort);
  74. // 将新的会话添加到列表中
  75. RtpSendClientInfo_t newSession = udpSession;
  76. newSession.udpSocket = udpSocket;
  77. m_lockUdpSockets.lock();
  78. m_listUdpSockets.append(newSession);
  79. m_lockUdpSockets.unlock();
  80. m_isRecvData.store(true); // 设置为接收数据状态
  81. return true;
  82. }
  83. /* 删除一个会话 */
  84. bool RTPOneRoadThread::removeUdpSession(const RtpSendClientInfo_t& udpSession)
  85. {
  86. std::lock_guard<QMutex> lock(m_lockUdpSockets);
  87. for(auto it = m_listUdpSockets.begin(); it != m_listUdpSockets.end(); ++it)
  88. {
  89. if(it->clientIP == udpSession.clientIP && it->clientPort == udpSession.clientPort)
  90. {
  91. if(it->udpSocket != nullptr)
  92. {
  93. it->udpSocket->close();
  94. delete it->udpSocket;
  95. it->udpSocket = nullptr;
  96. }
  97. SPDLOG_LOGGER_INFO(m_logger, "{} 删除UDP会话: {}:{}", m_logBase, it->clientIP.toStdString(), it->clientPort);
  98. // 从列表中删除该会话
  99. m_listUdpSockets.erase(it);
  100. break;
  101. }
  102. }
  103. /* 如果列表为空,就不再接收数据 */
  104. if(m_listUdpSockets.isEmpty())
  105. {
  106. m_isRecvData.store(false);
  107. SPDLOG_LOGGER_DEBUG(m_logger, "{} UDP会话列表为空,不再接收音频原始数据", m_logBase);
  108. /* 清空环形队列 */
  109. while(m_ringQueue.QueueSize() > 0)
  110. {
  111. auto data = m_ringQueue.front_pop_noBlock();
  112. if(data != nullptr)
  113. {
  114. delete data;
  115. data = nullptr;
  116. }
  117. }
  118. }
  119. return true;
  120. }
  121. /* 发送数据的线程函数 */
  122. void RTPOneRoadThread::task()
  123. {
  124. SPDLOG_LOGGER_INFO(m_logger, "➢ {} 开启RTP发送数据线程 ", m_logBase);
  125. if(!initData())
  126. {
  127. SPDLOG_LOGGER_ERROR(m_logger, "{} 初始化数据失败", m_logBase);
  128. return;
  129. }
  130. m_sendTimer.setTimerType(Qt::PreciseTimer);
  131. m_sendTimer.setSingleShot(false); // 设置为循环定时器
  132. m_sendTimer.setInterval(10); // 每10毫秒发送一次数据
  133. QEventLoop::connect(&m_sendTimer, &QTimer::timeout, this, &RTPOneRoadThread::do_timerSendData);
  134. m_sendTimer.start();
  135. /* 将线程控制权交给Qt的事件循环 */
  136. m_eventLoop.exec();
  137. /* 清空数据 */
  138. clearData();
  139. SPDLOG_LOGGER_WARN(m_logger, "➢ {} RTP发送数据线程结束 ", m_logBase);
  140. }
  141. /* 初始化数据 */
  142. bool RTPOneRoadThread::initData()
  143. {
  144. m_listUdpSockets.clear();
  145. m_isRecvData.store(false);
  146. m_ringQueue.setQueueCapacity(512);
  147. m_ringQueue.setDefaultValue(nullptr);
  148. return true;
  149. }
  150. /* 清除数据*/
  151. void RTPOneRoadThread::clearData()
  152. {
  153. m_isRecvData.store(false); // 设置为不接收数据状态
  154. /* 清空UDP会话列表 */
  155. std::lock_guard<QMutex> lock(m_lockUdpSockets);
  156. for(auto& session : m_listUdpSockets)
  157. {
  158. if(session.udpSocket != nullptr)
  159. {
  160. session.udpSocket->close();
  161. delete session.udpSocket;
  162. session.udpSocket = nullptr;
  163. }
  164. }
  165. m_listUdpSockets.clear();
  166. /* 清空环形队列 */
  167. while(m_ringQueue.QueueSize() > 0)
  168. {
  169. auto data = m_ringQueue.front_pop_noBlock();
  170. if(data != nullptr)
  171. {
  172. delete data;
  173. data = nullptr;
  174. }
  175. }
  176. }
  177. /**
  178. * @brief 发送数据,先获取一个缓冲区的数据,获取到后就遍历所有的UDP会话,将这个缓冲区所有的数据都发送出去
  179. * 然后紧接着获取第二个缓冲区的数据,也一起发送出去
  180. *
  181. * @return true
  182. * @return false
  183. */
  184. bool RTPOneRoadThread::sendData()
  185. {
  186. while(m_ringQueue.QueueSize() > 0 && m_isRunning.load())
  187. {
  188. auto data = m_ringQueue.front_pop_noBlock();
  189. if(data == nullptr)
  190. {
  191. continue;
  192. }
  193. if(data->isEmpty())
  194. {
  195. delete data;
  196. continue;
  197. }
  198. std::lock_guard<QMutex> lock(m_lockUdpSockets);
  199. /* 遍历所有的UDP会话,发送数据 */
  200. for(const auto& session : m_listUdpSockets)
  201. {
  202. if(session.udpSocket == nullptr || !session.udpSocket->isValid())
  203. {
  204. SPDLOG_LOGGER_WARN(m_logger, "{} 无效的UDP套接字: {}:{}", m_logBase, session.clientIP.toStdString(), session.clientPort);
  205. continue;
  206. }
  207. qint64 bytesSent = session.udpSocket->writeDatagram(data->pData, data->dataSize,
  208. QHostAddress(session.clientIP), session.clientPort);
  209. if(bytesSent == -1)
  210. {
  211. SPDLOG_LOGGER_ERROR(m_logger, "{} 发送数据失败: {}:{}", m_logBase, session.clientIP.toStdString(), session.clientPort);
  212. }
  213. else
  214. {
  215. SPDLOG_LOGGER_DEBUG(m_logger, "{} 发送数据成功: {}:{} 大小: {}", m_logBase, session.clientIP.toStdString(), session.clientPort, bytesSent);
  216. }
  217. }
  218. /* 发送完数据后,删除这个数据 */
  219. delete data;
  220. data = nullptr;
  221. }
  222. return true;
  223. }
  224. /* 定时发送数据 */
  225. void RTPOneRoadThread::do_timerSendData()
  226. {
  227. if(!m_isRecvData)
  228. {
  229. return;
  230. }
  231. /* 先初始化UDP */
  232. /* 发送数据 */
  233. sendData();
  234. }