123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- #include "RtpOneRoadThread.h"
- #include "spdlog.h"
- RTPOneRoadThread::RTPOneRoadThread(RecordThreadInfo_t& threadInfo)
- : BaseRecordThread(threadInfo),
- m_eventLoop()
- {
- m_logger = spdlog::get("RTPServer");
- if(m_logger == nullptr)
- {
- fmt::print("RTPServer 日志记录器未初始化,请先初始化日志记录器");
- return;
- }
- m_logBase = fmt::format("录音通道: {}:{}", m_threadInfo.cardRoadInfo.strSoundCardName.toStdString(),
- m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
-
- m_isRunning.store(true); // 设置为运行状态
- }
- RTPOneRoadThread::~RTPOneRoadThread()
- {
- }
- /**
- * @brief 停止线程
- */
- void RTPOneRoadThread::stopThread()
- {
- if(m_sendTimer.isActive())
- {
- m_sendTimer.stop();
- }
- while(m_eventLoop.isRunning())
- {
- m_eventLoop.quit();
- m_eventLoop.processEvents(QEventLoop::AllEvents, 10);
- }
- }
- /* 设置数据 */
- bool RTPOneRoadThread::setData(const AudioSrcData& srcData)
- {
- /* UDP会话列表是空的,就不接收数据了 */
- if(!m_isRecvData.load())
- {
- return true;
- }
- /* 如果数据列表1被占用,就写入数据列表2 */
- AudioSrcData* data = new AudioSrcData(srcData);
- if(data == nullptr)
- {
- return false;
- }
- m_ringQueue.push(data);
- return false;
- }
- /* 添加一个UDP会话 */
- bool RTPOneRoadThread::addUdpSession(const RtpSendClientInfo_t& udpSession)
- {
- /* 检查是否已有相同的会话,这里只需要检查IP和端口即可,会话ID和名称在这里没什么用 */
- for(const auto& session : m_listUdpSockets)
- {
- if(session.clientIP == udpSession.clientIP && session.clientPort == udpSession.clientPort)
- {
- SPDLOG_LOGGER_DEBUG(m_logger, "{} 已存在相同的UDP会话: {}:{}", m_logBase, udpSession.clientIP.toStdString(), udpSession.clientPort);
- return true;
- }
- }
- /* 创建新的UDP套接字 */
- QUdpSocket* udpSocket = new QUdpSocket();
- /* 绑定本地IP和端口 */
- if(!udpSocket->bind(QHostAddress::Any, udpSession.localPort))
- {
- SPDLOG_LOGGER_ERROR(m_logger, "{} 创建UDP套接字失败: {}:{}", m_logBase, udpSession.clientIP.toStdString(), udpSession.clientPort);
- delete udpSocket;
- return false;
- }
- SPDLOG_LOGGER_INFO(m_logger, "{} 添加UDP会话: {}:{}", m_logBase, udpSession.clientIP.toStdString(), udpSession.clientPort);
- // 将新的会话添加到列表中
- RtpSendClientInfo_t newSession = udpSession;
- newSession.udpSocket = udpSocket;
- m_lockUdpSockets.lock();
- m_listUdpSockets.append(newSession);
- m_lockUdpSockets.unlock();
-
- m_isRecvData.store(true); // 设置为接收数据状态
- return true;
- }
- /* 删除一个会话 */
- bool RTPOneRoadThread::removeUdpSession(const RtpSendClientInfo_t& udpSession)
- {
- std::lock_guard<QMutex> lock(m_lockUdpSockets);
- for(auto it = m_listUdpSockets.begin(); it != m_listUdpSockets.end(); ++it)
- {
- if(it->clientIP == udpSession.clientIP && it->clientPort == udpSession.clientPort)
- {
- if(it->udpSocket != nullptr)
- {
- it->udpSocket->close();
- delete it->udpSocket;
- it->udpSocket = nullptr;
- }
- SPDLOG_LOGGER_INFO(m_logger, "{} 删除UDP会话: {}:{}", m_logBase, it->clientIP.toStdString(), it->clientPort);
- // 从列表中删除该会话
- m_listUdpSockets.erase(it);
- break;
- }
- }
- /* 如果列表为空,就不再接收数据 */
- if(m_listUdpSockets.isEmpty())
- {
- m_isRecvData.store(false);
- SPDLOG_LOGGER_DEBUG(m_logger, "{} UDP会话列表为空,不再接收音频原始数据", m_logBase);
- /* 清空环形队列 */
- while(m_ringQueue.QueueSize() > 0)
- {
- auto data = m_ringQueue.front_pop_noBlock();
- if(data != nullptr)
- {
- delete data;
- data = nullptr;
- }
- }
- }
- return true;
- }
- /* 发送数据的线程函数 */
- void RTPOneRoadThread::task()
- {
- SPDLOG_LOGGER_INFO(m_logger, "➢ {} 开启RTP发送数据线程 ", m_logBase);
- if(!initData())
- {
- SPDLOG_LOGGER_ERROR(m_logger, "{} 初始化数据失败", m_logBase);
- return;
- }
- m_sendTimer.setTimerType(Qt::PreciseTimer);
- m_sendTimer.setSingleShot(false); // 设置为循环定时器
- m_sendTimer.setInterval(10); // 每10毫秒发送一次数据
- QEventLoop::connect(&m_sendTimer, &QTimer::timeout, this, &RTPOneRoadThread::do_timerSendData);
- m_sendTimer.start();
- /* 将线程控制权交给Qt的事件循环 */
- m_eventLoop.exec();
- /* 清空数据 */
- clearData();
- SPDLOG_LOGGER_WARN(m_logger, "➢ {} RTP发送数据线程结束 ", m_logBase);
- }
- /* 初始化数据 */
- bool RTPOneRoadThread::initData()
- {
- m_listUdpSockets.clear();
- m_isRecvData.store(false);
- m_ringQueue.setQueueCapacity(512);
- m_ringQueue.setDefaultValue(nullptr);
- return true;
- }
- /* 清除数据*/
- void RTPOneRoadThread::clearData()
- {
- m_isRecvData.store(false); // 设置为不接收数据状态
- /* 清空UDP会话列表 */
- std::lock_guard<QMutex> lock(m_lockUdpSockets);
- for(auto& session : m_listUdpSockets)
- {
- if(session.udpSocket != nullptr)
- {
- session.udpSocket->close();
- delete session.udpSocket;
- session.udpSocket = nullptr;
- }
- }
- m_listUdpSockets.clear();
-
- /* 清空环形队列 */
- while(m_ringQueue.QueueSize() > 0)
- {
- auto data = m_ringQueue.front_pop_noBlock();
- if(data != nullptr)
- {
- delete data;
- data = nullptr;
- }
- }
- }
- /**
- * @brief 发送数据,先获取一个缓冲区的数据,获取到后就遍历所有的UDP会话,将这个缓冲区所有的数据都发送出去
- * 然后紧接着获取第二个缓冲区的数据,也一起发送出去
- *
- * @return true
- * @return false
- */
- bool RTPOneRoadThread::sendData()
- {
- while(m_ringQueue.QueueSize() > 0 && m_isRunning.load())
- {
- auto data = m_ringQueue.front_pop_noBlock();
- if(data == nullptr)
- {
- continue;
- }
- if(data->isEmpty())
- {
- delete data;
- continue;
- }
- std::lock_guard<QMutex> lock(m_lockUdpSockets);
- /* 遍历所有的UDP会话,发送数据 */
- for(const auto& session : m_listUdpSockets)
- {
- if(session.udpSocket == nullptr || !session.udpSocket->isValid())
- {
- SPDLOG_LOGGER_WARN(m_logger, "{} 无效的UDP套接字: {}:{}", m_logBase, session.clientIP.toStdString(), session.clientPort);
- continue;
- }
- qint64 bytesSent = session.udpSocket->writeDatagram(data->pData, data->dataSize,
- QHostAddress(session.clientIP), session.clientPort);
- if(bytesSent == -1)
- {
- SPDLOG_LOGGER_ERROR(m_logger, "{} 发送数据失败: {}:{}", m_logBase, session.clientIP.toStdString(), session.clientPort);
- }
- else
- {
- SPDLOG_LOGGER_DEBUG(m_logger, "{} 发送数据成功: {}:{} 大小: {}", m_logBase, session.clientIP.toStdString(), session.clientPort, bytesSent);
- }
- }
- /* 发送完数据后,删除这个数据 */
- delete data;
- data = nullptr;
- }
- return true;
- }
- /* 定时发送数据 */
- void RTPOneRoadThread::do_timerSendData()
- {
- if(!m_isRecvData)
- {
- return;
- }
- /* 先初始化UDP */
- /* 发送数据 */
- sendData();
- }
|