RtpOneRoadThread.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. #include "RtpOneRoadThread.h"
  2. #include "spdlog.h"
  3. RTPOneRoadThread::RTPOneRoadThread(RecordThreadInfo_t& threadInfo)
  4. : BaseRecordThread(threadInfo),
  5. m_eventLoop()
  6. {
  7. m_logger_local = spdlog::get("RTPServer");
  8. if(m_logger_local == nullptr)
  9. {
  10. fmt::print("RTPServer 日志记录器未初始化,请先初始化日志记录器");
  11. return;
  12. }
  13. m_logBase = fmt::format("录音通道: {}:{}", threadInfo.cardRoadInfo.strSoundCardName, threadInfo.cardRoadInfo.pcmInfo.strPCMName);
  14. }
  15. RTPOneRoadThread::~RTPOneRoadThread()
  16. {
  17. }
  18. /**
  19. * @brief 停止线程
  20. */
  21. void RTPOneRoadThread::thread_stop()
  22. {
  23. m_isRunning.store(false);
  24. /* 等待事件循环停止 */
  25. while(m_isStoped.load())
  26. {
  27. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  28. }
  29. }
  30. /* 设置数据 */
  31. bool RTPOneRoadThread::setData(const AudioSrcData& srcData)
  32. {
  33. /* UDP会话列表是空的,就不接收数据了 */
  34. if(!m_isRecvData.load())
  35. {
  36. return true;
  37. }
  38. AudioSrcData* pData = new AudioSrcData(srcData);
  39. if(pData == nullptr)
  40. {
  41. return false;
  42. }
  43. auto oldData = m_ringQueue.push_pop(pData);
  44. if(oldData != nullptr)
  45. {
  46. SPDLOG_LOGGER_ERROR(m_logger_local, "{} 环形队列已满,无法添加新的音频数据", m_logBase);
  47. delete oldData; // 删除被覆盖的数据
  48. oldData = nullptr;
  49. }
  50. return false;
  51. }
  52. /* 添加一个UDP会话 */
  53. bool RTPOneRoadThread::addUdpSession(const RtpSendClientInfo_t& udpSession)
  54. {
  55. /* 检查是否已有相同的会话,这里只需要检查IP和端口即可,会话ID和名称在这里没什么用 */
  56. for(const auto& session : m_listClients)
  57. {
  58. if(session.clientIP == udpSession.clientIP && session.clientPort == udpSession.clientPort)
  59. {
  60. SPDLOG_LOGGER_DEBUG(m_logger_local, "{} 已存在相同的UDP会话: {}:{}", m_logBase, udpSession.clientIP.toStdString(), udpSession.clientPort);
  61. return false;
  62. }
  63. }
  64. SPDLOG_LOGGER_INFO(m_logger_local, "{} 添加UDP会话: {}:{}", m_logBase, udpSession.clientIP.toStdString(), udpSession.clientPort);
  65. /* 绑定UDP本地端口 */
  66. if(m_udpState == eUDPState::eUDP_None)
  67. {
  68. m_udpState = eUDPState::eUDP_Init;
  69. m_localIP = udpSession.localIP;
  70. }
  71. std::lock_guard<std::mutex> lock(m_lockClients);
  72. m_listClients.append(udpSession);
  73. /* 触发一次定时器槽函数,执行处理UDP状态的函数 */
  74. emit signal_timerSendData();
  75. return true;
  76. }
  77. /* 删除一个会话 */
  78. bool RTPOneRoadThread::removeUdpSession(QString clientIP, quint16 clientPort)
  79. {
  80. std::lock_guard<std::mutex> lock(m_lockClients);
  81. for(auto it = m_listClients.begin(); it != m_listClients.end(); ++it)
  82. {
  83. if(it->clientIP == clientIP && it->clientPort == clientPort)
  84. {
  85. // if(it->udpSocket != nullptr)
  86. // {
  87. // it->udpSocket->close();
  88. // delete it->udpSocket;
  89. // it->udpSocket = nullptr;
  90. // }
  91. SPDLOG_LOGGER_INFO(m_logger_local, "{} 删除UDP会话: {}:{}", m_logBase, it->clientIP.toStdString(), it->clientPort);
  92. // 从列表中删除该会话
  93. m_listClients.erase(it);
  94. break;
  95. }
  96. }
  97. /* 如果列表为空,就不再接收数据 */
  98. if(m_listClients.isEmpty())
  99. {
  100. /* 清空环形队列 */
  101. while(m_ringQueue.QueueSize() > 0)
  102. {
  103. auto data = m_ringQueue.front_pop_noBlock();
  104. if(data != nullptr)
  105. {
  106. delete data;
  107. data = nullptr;
  108. }
  109. }
  110. m_udpState = eUDPState::eUDP_Closed;
  111. SPDLOG_LOGGER_DEBUG(m_logger_local, "{} UDP会话列表为空,不再接收音频原始数据", m_logBase);
  112. /* 触发一次定时器槽函数,执行处理UDP状态的函数 */
  113. emit signal_timerSendData();
  114. }
  115. return true;
  116. }
  117. /* 发送数据的线程函数 */
  118. void RTPOneRoadThread::task()
  119. {
  120. SPDLOG_LOGGER_INFO(m_logger_local, "➢ {} 开启RTP发送数据线程 ", m_logBase);
  121. if(!initData())
  122. {
  123. SPDLOG_LOGGER_ERROR(m_logger_local, "{} 初始化数据失败", m_logBase);
  124. return;
  125. }
  126. m_sendTimer.setTimerType(Qt::PreciseTimer);
  127. m_sendTimer.setSingleShot(false);
  128. m_sendTimer.setInterval(25);
  129. QEventLoop::connect(&m_sendTimer, &QTimer::timeout, this, &RTPOneRoadThread::do_timerSendData);
  130. connect(this, &RTPOneRoadThread::signal_timerSendData, this, &RTPOneRoadThread::do_timerSendData);
  131. m_sendTimer.start();
  132. m_isRunning.store(true); // 设置为运行状态
  133. m_isStoped.store(false);
  134. /* 将线程控制权交给Qt的事件循环 */
  135. m_eventLoop.exec();
  136. /* 清空数据 */
  137. clearData();
  138. m_isRunning.store(false);
  139. m_isStoped.store(true);
  140. SPDLOG_LOGGER_WARN(m_logger_local, "➢ {} RTP发送数据线程结束 ", m_logBase);
  141. }
  142. /* 初始化数据 */
  143. bool RTPOneRoadThread::initData()
  144. {
  145. m_listClients.clear();
  146. m_isRecvData.store(false);
  147. m_ringQueue.setQueueCapacity(60);
  148. m_ringQueue.setDefaultValue(nullptr);
  149. return true;
  150. }
  151. /* 清除数据*/
  152. void RTPOneRoadThread::clearData()
  153. {
  154. m_isRecvData.store(false); // 设置为不接收数据状态
  155. /* 清空UDP会话列表 */
  156. std::lock_guard<std::mutex> lock(m_lockClients);
  157. m_listClients.clear();
  158. /* 关闭UDP占用的本地端口 */
  159. if(m_udpSocket != nullptr)
  160. {
  161. m_udpSocket->close();
  162. m_udpSocket->deleteLater();
  163. }
  164. m_udpState = eUDPState::eUDP_None;
  165. /* 清空环形队列 */
  166. while(m_ringQueue.QueueSize() > 0)
  167. {
  168. auto data = m_ringQueue.front_pop_noBlock();
  169. if(data != nullptr)
  170. {
  171. delete data;
  172. data = nullptr;
  173. }
  174. }
  175. }
  176. /* 处理UDP状态 */
  177. bool RTPOneRoadThread::processUdpState()
  178. {
  179. if(m_udpSocket == nullptr)
  180. {
  181. m_udpSocket = new QUdpSocket();
  182. if(m_udpSocket == nullptr)
  183. {
  184. SPDLOG_LOGGER_ERROR(m_logger_local, "{} 创建UDP套接字失败", m_logBase);
  185. return false;
  186. }
  187. m_udpSocket->setSocketOption(QAbstractSocket::LowDelayOption, 1); // 设置低延迟选项
  188. connect(m_udpSocket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(do_udpError(QAbstractSocket::SocketError)));
  189. }
  190. if(m_udpState == eUDPState::eUDP_Init)
  191. {
  192. if(!m_udpSocket->bind(QHostAddress(m_localIP)))
  193. {
  194. SPDLOG_LOGGER_ERROR(m_logger_local, "{} 绑定UDP套接字失败: {}:{}", m_logBase, m_localIP.toStdString(), m_udpSocket->localPort());
  195. return false;
  196. }
  197. SPDLOG_LOGGER_INFO(m_logger_local, "{} 绑定UDP套接字到本地IP: {}:{}", m_logBase,
  198. m_udpSocket->localAddress().toString().toStdString(), m_udpSocket->localPort());
  199. /* 设置为接收数据状态 */
  200. m_isRecvData.store(true);
  201. m_udpState = eUDPState::eUDP_Opened;
  202. }
  203. else if(m_udpState == eUDPState::eUDP_Closed || m_udpState == eUDPState::eUDP_Error)
  204. {
  205. /* 关闭接收数据 */
  206. m_isRecvData.store(false);
  207. /* 关闭UDP占用的本地端口 */
  208. if(m_udpSocket != nullptr)
  209. {
  210. m_udpSocket->close();
  211. }
  212. m_udpState = eUDPState::eUDP_None;
  213. /* 清空连接信息 */
  214. m_listClients.clear();
  215. m_localIP.clear();
  216. // m_localPort = -1;
  217. emit signal_udpClosed(m_threadInfo.cardRoadInfo.pcmInfo.strPCMName);
  218. /* 清空环形队列中的数据 */
  219. while(m_ringQueue.QueueSize() > 0)
  220. {
  221. auto data = m_ringQueue.front_pop_noBlock();
  222. if(data != nullptr)
  223. {
  224. delete data;
  225. data = nullptr;
  226. }
  227. }
  228. }
  229. return true;
  230. }
  231. /**
  232. * @brief 发送数据,先获取一个缓冲区的数据,获取到后就遍历所有的UDP会话,将这个缓冲区所有的数据都发送出去
  233. * 然后紧接着获取第二个缓冲区的数据,也一起发送出去
  234. *
  235. * @return true
  236. * @return false
  237. */
  238. bool RTPOneRoadThread::sendData()
  239. {
  240. while(m_ringQueue.QueueSize() > 0)
  241. {
  242. // SPDLOG_LOGGER_TRACE(m_logger_local, "{} 发送音频数据", m_logBase);
  243. auto pData = m_ringQueue.front_pop_noBlock();
  244. if(pData == nullptr)
  245. {
  246. continue;
  247. }
  248. if(pData->isEmpty())
  249. {
  250. delete pData;
  251. continue;
  252. }
  253. std::lock_guard<std::mutex> lock(m_lockClients);
  254. /* 遍历所有的UDP会话,发送数据 */
  255. for(const auto& session : m_listClients)
  256. {
  257. // qint64 bytesSent = session.udpSocket->writeDatagram(data->pData, data->dataSize,
  258. // QHostAddress(session.clientIP), session.clientPort);
  259. qint64 bytesSent = m_udpSocket->writeDatagram(QByteArray(reinterpret_cast<const char*>(pData->pData), pData->dataSize),
  260. QHostAddress(session.clientIP), session.clientPort);
  261. if(bytesSent == -1)
  262. {
  263. SPDLOG_LOGGER_ERROR(m_logger_local, "{} 发送数据失败: {}:{}", m_logBase, session.clientIP.toStdString(), session.clientPort);
  264. } else
  265. {
  266. // SPDLOG_LOGGER_TRACE(m_logger_local, "{} 发送数据成功: {}:{}, 本地IP: {}:{}, 大小: {}", m_logBase,
  267. // session.clientIP.toStdString(), session.clientPort,
  268. // m_udpSocket->localAddress().toString().toStdString(), m_udpSocket->localPort(), bytesSent);
  269. }
  270. }
  271. /* 发送完数据后,删除这个数据 */
  272. delete pData;
  273. pData = nullptr;
  274. }
  275. return true;
  276. }
  277. /* 停止线程 */
  278. void RTPOneRoadThread::stop_thread()
  279. {
  280. if(m_sendTimer.isActive())
  281. {
  282. m_sendTimer.stop();
  283. }
  284. if(m_eventLoop.isRunning())
  285. {
  286. m_eventLoop.quit();
  287. m_eventLoop.processEvents(QEventLoop::AllEvents, 10);
  288. }
  289. }
  290. /* 定时发送数据 */
  291. void RTPOneRoadThread::do_timerSendData()
  292. {
  293. if(m_isRunning.load() == false)
  294. {
  295. stop_thread();
  296. }
  297. if(m_udpState == eUDPState::eUDP_None)
  298. {
  299. return;
  300. }
  301. processUdpState();
  302. /* 发送数据 */
  303. sendData();
  304. }
  305. /* UDP错误槽函数 */
  306. void RTPOneRoadThread::do_udpError(QAbstractSocket::SocketError socketError)
  307. {
  308. auto senderSocket = qobject_cast<QUdpSocket*>(sender());
  309. // SPDLOG_LOGGER_ERROR(m_logger_local, "{} UDP套接字错误: {}", m_logBase, static_cast<int>(socketError));
  310. /* 这里可以根据需要处理不同的错误 */
  311. if(socketError == QAbstractSocket::SocketError::SocketTimeoutError)
  312. {
  313. SPDLOG_LOGGER_WARN(m_logger_local, "{} UDP连接超时", m_logBase);
  314. } else
  315. {
  316. SPDLOG_LOGGER_ERROR(m_logger_local, "{} UDP套接字发生错误: {}", m_logBase, static_cast<int>(socketError));
  317. SPDLOG_LOGGER_ERROR(m_logger_local, "{} 错误信息: {}", m_logBase, senderSocket->errorString().toStdString());
  318. }
  319. m_udpState = eUDPState::eUDP_Error;
  320. }