RtpOneRoadThread.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. #include "RtpOneRoadThread.h"
  2. #include "spdlog.h"
  3. RTPOneRoadThread::RTPOneRoadThread(RecordThreadInfo_t& threadInfo)
  4. : BaseRecordThread(threadInfo),
  5. m_eventLoop()
  6. {
  7. m_logger = spdlog::get("RTPServer");
  8. if(m_logger == nullptr)
  9. {
  10. fmt::print("RTPServer 日志记录器未初始化,请先初始化日志记录器");
  11. return;
  12. }
  13. m_logBase = fmt::format("录音通道: {}:{}", m_threadInfo.cardRoadInfo.strSoundCardName.toStdString(),
  14. m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  15. }
  16. RTPOneRoadThread::~RTPOneRoadThread()
  17. {
  18. }
  19. /**
  20. * @brief 停止线程
  21. */
  22. void RTPOneRoadThread::stopThread()
  23. {
  24. if(m_sendTimer.isActive())
  25. {
  26. m_sendTimer.stop();
  27. }
  28. if(m_eventLoop.isRunning())
  29. {
  30. m_eventLoop.quit();
  31. m_eventLoop.processEvents(QEventLoop::AllEvents, 10);
  32. }
  33. while(m_isRunning.load())
  34. {
  35. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  36. }
  37. }
  38. /* 设置数据 */
  39. bool RTPOneRoadThread::setData(const AudioSrcData& srcData)
  40. {
  41. /* UDP会话列表是空的,就不接收数据了 */
  42. if(!m_isRecvData.load())
  43. {
  44. return true;
  45. }
  46. AudioSrcData* data = new AudioSrcData(srcData);
  47. if(data == nullptr)
  48. {
  49. return false;
  50. }
  51. if(!m_ringQueue.push_noBlock(data))
  52. {
  53. SPDLOG_LOGGER_ERROR(m_logger, "{} 环形队列已满,无法添加新的音频数据", m_logBase);
  54. delete data; // 删除无用数据
  55. return false;
  56. }
  57. return false;
  58. }
  59. /* 添加一个UDP会话 */
  60. bool RTPOneRoadThread::addUdpSession(const RtpSendClientInfo_t& udpSession)
  61. {
  62. /* 检查是否已有相同的会话,这里只需要检查IP和端口即可,会话ID和名称在这里没什么用 */
  63. for(const auto& session : m_listClients)
  64. {
  65. if(session.clientIP == udpSession.clientIP && session.clientPort == udpSession.clientPort)
  66. {
  67. SPDLOG_LOGGER_DEBUG(m_logger, "{} 已存在相同的UDP会话: {}:{}", m_logBase, udpSession.clientIP.toStdString(), udpSession.clientPort);
  68. return false;
  69. }
  70. }
  71. SPDLOG_LOGGER_INFO(m_logger, "{} 添加UDP会话: {}:{}", m_logBase, udpSession.clientIP.toStdString(), udpSession.clientPort);
  72. /* 绑定UDP本地端口 */
  73. if(m_udpState == eUDPState::eUDP_None)
  74. {
  75. m_udpState = eUDPState::eUDP_Init;
  76. m_localIP = udpSession.localIP;
  77. // m_localPort = udpSession.localPort;
  78. }
  79. std::lock_guard<std::mutex> lock(m_lockClients);
  80. m_listClients.append(udpSession);
  81. return true;
  82. }
  83. /* 删除一个会话 */
  84. bool RTPOneRoadThread::removeUdpSession(QString clientIP, quint16 clientPort)
  85. {
  86. std::lock_guard<std::mutex> lock(m_lockClients);
  87. for(auto it = m_listClients.begin(); it != m_listClients.end(); ++it)
  88. {
  89. if(it->clientIP == clientIP && it->clientPort == clientPort)
  90. {
  91. // if(it->udpSocket != nullptr)
  92. // {
  93. // it->udpSocket->close();
  94. // delete it->udpSocket;
  95. // it->udpSocket = nullptr;
  96. // }
  97. SPDLOG_LOGGER_INFO(m_logger, "{} 删除UDP会话: {}:{}", m_logBase, it->clientIP.toStdString(), it->clientPort);
  98. // 从列表中删除该会话
  99. m_listClients.erase(it);
  100. break;
  101. }
  102. }
  103. /* 如果列表为空,就不再接收数据 */
  104. if(m_listClients.isEmpty())
  105. {
  106. /* 清空环形队列 */
  107. while(m_ringQueue.QueueSize() > 0)
  108. {
  109. auto data = m_ringQueue.front_pop_noBlock();
  110. if(data != nullptr)
  111. {
  112. delete data;
  113. data = nullptr;
  114. }
  115. }
  116. m_udpState = eUDPState::eUDP_Closed;
  117. SPDLOG_LOGGER_DEBUG(m_logger, "{} UDP会话列表为空,不再接收音频原始数据", m_logBase);
  118. }
  119. return true;
  120. }
  121. /* 发送数据的线程函数 */
  122. void RTPOneRoadThread::task()
  123. {
  124. SPDLOG_LOGGER_INFO(m_logger, "➢ {} 开启RTP发送数据线程 ", m_logBase);
  125. if(!initData())
  126. {
  127. SPDLOG_LOGGER_ERROR(m_logger, "{} 初始化数据失败", m_logBase);
  128. return;
  129. }
  130. m_sendTimer.setTimerType(Qt::PreciseTimer);
  131. m_sendTimer.setSingleShot(false); // 设置为循环定时器
  132. m_sendTimer.setInterval(10); // 每10毫秒发送一次数据
  133. QEventLoop::connect(&m_sendTimer, &QTimer::timeout, this, &RTPOneRoadThread::do_timerSendData);
  134. m_sendTimer.start();
  135. m_isRunning.store(true); // 设置为运行状态
  136. /* 将线程控制权交给Qt的事件循环 */
  137. m_eventLoop.exec();
  138. /* 清空数据 */
  139. clearData();
  140. m_isRunning.store(false);
  141. SPDLOG_LOGGER_WARN(m_logger, "➢ {} RTP发送数据线程结束 ", m_logBase);
  142. }
  143. /* 初始化数据 */
  144. bool RTPOneRoadThread::initData()
  145. {
  146. m_listClients.clear();
  147. m_isRecvData.store(false);
  148. m_ringQueue.setQueueCapacity(60);
  149. m_ringQueue.setDefaultValue(nullptr);
  150. return true;
  151. }
  152. /* 清除数据*/
  153. void RTPOneRoadThread::clearData()
  154. {
  155. m_isRecvData.store(false); // 设置为不接收数据状态
  156. /* 清空UDP会话列表 */
  157. std::lock_guard<std::mutex> lock(m_lockClients);
  158. m_listClients.clear();
  159. /* 关闭UDP占用的本地端口 */
  160. if(m_udpSocket != nullptr)
  161. {
  162. m_udpSocket->close();
  163. m_udpSocket->deleteLater();
  164. }
  165. m_udpState = eUDPState::eUDP_None;
  166. /* 清空环形队列 */
  167. while(m_ringQueue.QueueSize() > 0)
  168. {
  169. auto data = m_ringQueue.front_pop_noBlock();
  170. if(data != nullptr)
  171. {
  172. delete data;
  173. data = nullptr;
  174. }
  175. }
  176. }
  177. /* 处理UDP状态 */
  178. bool RTPOneRoadThread::processUdpState()
  179. {
  180. if(m_udpSocket == nullptr)
  181. {
  182. m_udpSocket = new QUdpSocket();
  183. if(m_udpSocket == nullptr)
  184. {
  185. SPDLOG_LOGGER_ERROR(m_logger, "{} 创建UDP套接字失败", m_logBase);
  186. return false;
  187. }
  188. m_udpSocket->setSocketOption(QAbstractSocket::LowDelayOption, 1); // 设置低延迟选项
  189. connect(m_udpSocket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(do_udpError(QAbstractSocket::SocketError)));
  190. }
  191. if(m_udpState == eUDPState::eUDP_Init)
  192. {
  193. if(!m_udpSocket->bind(QHostAddress(m_localIP)))
  194. {
  195. SPDLOG_LOGGER_ERROR(m_logger, "{} 绑定UDP套接字失败: {}:{}", m_logBase, m_localIP.toStdString(), m_udpSocket->localPort());
  196. return false;
  197. }
  198. SPDLOG_LOGGER_INFO(m_logger, "{} 绑定UDP套接字到本地IP: {}:{}", m_logBase,
  199. m_udpSocket->localAddress().toString().toStdString(), m_udpSocket->localPort());
  200. /* 设置为接收数据状态 */
  201. m_isRecvData.store(true);
  202. m_udpState = eUDPState::eUDP_Opened;
  203. }
  204. else if(m_udpState == eUDPState::eUDP_Closed || m_udpState == eUDPState::eUDP_Error)
  205. {
  206. /* 关闭接收数据 */
  207. m_isRecvData.store(false);
  208. /* 关闭UDP占用的本地端口 */
  209. if(m_udpSocket != nullptr)
  210. {
  211. m_udpSocket->close();
  212. }
  213. m_udpState = eUDPState::eUDP_None;
  214. /* 清空连接信息 */
  215. m_listClients.clear();
  216. m_localIP.clear();
  217. // m_localPort = -1;
  218. emit signal_udpClosed(m_threadInfo.cardRoadInfo.nSoundCardNum,
  219. m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  220. /* 清空环形队列中的数据 */
  221. while(m_ringQueue.QueueSize() > 0)
  222. {
  223. auto data = m_ringQueue.front_pop_noBlock();
  224. if(data != nullptr)
  225. {
  226. delete data;
  227. data = nullptr;
  228. }
  229. }
  230. }
  231. return true;
  232. }
  233. /**
  234. * @brief 发送数据,先获取一个缓冲区的数据,获取到后就遍历所有的UDP会话,将这个缓冲区所有的数据都发送出去
  235. * 然后紧接着获取第二个缓冲区的数据,也一起发送出去
  236. *
  237. * @return true
  238. * @return false
  239. */
  240. bool RTPOneRoadThread::sendData()
  241. {
  242. while(m_ringQueue.QueueSize() > 0)
  243. {
  244. // SPDLOG_LOGGER_TRACE(m_logger, "{} 发送音频数据", m_logBase);
  245. auto data = m_ringQueue.front_pop_noBlock();
  246. if(data == nullptr)
  247. {
  248. continue;
  249. }
  250. if(data->isEmpty())
  251. {
  252. delete data;
  253. continue;
  254. }
  255. std::lock_guard<std::mutex> lock(m_lockClients);
  256. /* 遍历所有的UDP会话,发送数据 */
  257. for(const auto& session : m_listClients)
  258. {
  259. // qint64 bytesSent = session.udpSocket->writeDatagram(data->pData, data->dataSize,
  260. // QHostAddress(session.clientIP), session.clientPort);
  261. qint64 bytesSent = m_udpSocket->writeDatagram(QByteArray(reinterpret_cast<const char*>(data->pData), data->dataSize),
  262. QHostAddress(session.clientIP), session.clientPort);
  263. if(bytesSent == -1)
  264. {
  265. SPDLOG_LOGGER_ERROR(m_logger, "{} 发送数据失败: {}:{}", m_logBase, session.clientIP.toStdString(), session.clientPort);
  266. } else
  267. {
  268. SPDLOG_LOGGER_TRACE(m_logger, "{} 发送数据成功: {}:{}, 本地IP: {}:{}, 大小: {}", m_logBase,
  269. session.clientIP.toStdString(), session.clientPort,
  270. m_udpSocket->localAddress().toString().toStdString(), m_udpSocket->localPort(), bytesSent);
  271. }
  272. }
  273. /* 发送完数据后,删除这个数据 */
  274. delete data;
  275. data = nullptr;
  276. }
  277. return true;
  278. }
  279. /* 定时发送数据 */
  280. void RTPOneRoadThread::do_timerSendData()
  281. {
  282. if(m_udpState == eUDPState::eUDP_None)
  283. {
  284. return;
  285. }
  286. processUdpState();
  287. /* 发送数据 */
  288. sendData();
  289. }
  290. /* UDP错误槽函数 */
  291. void RTPOneRoadThread::do_udpError(QAbstractSocket::SocketError socketError)
  292. {
  293. auto senderSocket = qobject_cast<QUdpSocket*>(sender());
  294. // SPDLOG_LOGGER_ERROR(m_logger, "{} UDP套接字错误: {}", m_logBase, static_cast<int>(socketError));
  295. /* 这里可以根据需要处理不同的错误 */
  296. if(socketError == QAbstractSocket::SocketError::SocketTimeoutError)
  297. {
  298. SPDLOG_LOGGER_WARN(m_logger, "{} UDP连接超时", m_logBase);
  299. } else
  300. {
  301. SPDLOG_LOGGER_ERROR(m_logger, "{} UDP套接字发生错误: {}", m_logBase, static_cast<int>(socketError));
  302. SPDLOG_LOGGER_ERROR(m_logger, "{} 错误信息: {}", m_logBase, senderSocket->errorString().toStdString());
  303. }
  304. m_udpState = eUDPState::eUDP_Error;
  305. }