#include "RtpOneRoadThread.h" #include "spdlog.h" RTPOneRoadThread::RTPOneRoadThread(RecordThreadInfo_t& threadInfo) : BaseRecordThread(threadInfo), m_eventLoop() { m_logger = spdlog::get("RTPServer"); if(m_logger == nullptr) { fmt::print("RTPServer 日志记录器未初始化,请先初始化日志记录器"); return; } m_logBase = fmt::format("录音通道: {}:{}", m_threadInfo.cardRoadInfo.strSoundCardName.toStdString(), m_threadInfo.cardRoadInfo.roadInfo.nRoadNum); m_isRunning.store(true); // 设置为运行状态 } RTPOneRoadThread::~RTPOneRoadThread() { } /** * @brief 停止线程 */ void RTPOneRoadThread::stopThread() { if(m_sendTimer.isActive()) { m_sendTimer.stop(); } while(m_eventLoop.isRunning()) { m_eventLoop.quit(); m_eventLoop.processEvents(QEventLoop::AllEvents, 10); } } /* 设置数据 */ bool RTPOneRoadThread::setData(const AudioSrcData& srcData) { /* UDP会话列表是空的,就不接收数据了 */ if(!m_isRecvData.load()) { return true; } /* 如果数据列表1被占用,就写入数据列表2 */ AudioSrcData* data = new AudioSrcData(srcData); if(data == nullptr) { return false; } m_ringQueue.push(data); return false; } /* 添加一个UDP会话 */ bool RTPOneRoadThread::addUdpSession(const RtpSendClientInfo_t& udpSession) { /* 检查是否已有相同的会话,这里只需要检查IP和端口即可,会话ID和名称在这里没什么用 */ for(const auto& session : m_listUdpSockets) { if(session.clientIP == udpSession.clientIP && session.clientPort == udpSession.clientPort) { SPDLOG_LOGGER_DEBUG(m_logger, "{} 已存在相同的UDP会话: {}:{}", m_logBase, udpSession.clientIP.toStdString(), udpSession.clientPort); return true; } } /* 创建新的UDP套接字 */ QUdpSocket* udpSocket = new QUdpSocket(); /* 绑定本地IP和端口 */ if(!udpSocket->bind(QHostAddress::Any, udpSession.localPort)) { SPDLOG_LOGGER_ERROR(m_logger, "{} 创建UDP套接字失败: {}:{}", m_logBase, udpSession.clientIP.toStdString(), udpSession.clientPort); delete udpSocket; return false; } SPDLOG_LOGGER_INFO(m_logger, "{} 添加UDP会话: {}:{}", m_logBase, udpSession.clientIP.toStdString(), udpSession.clientPort); // 将新的会话添加到列表中 RtpSendClientInfo_t newSession = udpSession; newSession.udpSocket = udpSocket; m_lockUdpSockets.lock(); m_listUdpSockets.append(newSession); m_lockUdpSockets.unlock(); m_isRecvData.store(true); // 设置为接收数据状态 return true; } /* 删除一个会话 */ bool RTPOneRoadThread::removeUdpSession(const RtpSendClientInfo_t& udpSession) { std::lock_guard lock(m_lockUdpSockets); for(auto it = m_listUdpSockets.begin(); it != m_listUdpSockets.end(); ++it) { if(it->clientIP == udpSession.clientIP && it->clientPort == udpSession.clientPort) { if(it->udpSocket != nullptr) { it->udpSocket->close(); delete it->udpSocket; it->udpSocket = nullptr; } SPDLOG_LOGGER_INFO(m_logger, "{} 删除UDP会话: {}:{}", m_logBase, it->clientIP.toStdString(), it->clientPort); // 从列表中删除该会话 m_listUdpSockets.erase(it); break; } } /* 如果列表为空,就不再接收数据 */ if(m_listUdpSockets.isEmpty()) { m_isRecvData.store(false); SPDLOG_LOGGER_DEBUG(m_logger, "{} UDP会话列表为空,不再接收音频原始数据", m_logBase); /* 清空环形队列 */ while(m_ringQueue.QueueSize() > 0) { auto data = m_ringQueue.front_pop_noBlock(); if(data != nullptr) { delete data; data = nullptr; } } } return true; } /* 发送数据的线程函数 */ void RTPOneRoadThread::task() { SPDLOG_LOGGER_INFO(m_logger, "➢ {} 开启RTP发送数据线程 ", m_logBase); if(!initData()) { SPDLOG_LOGGER_ERROR(m_logger, "{} 初始化数据失败", m_logBase); return; } m_sendTimer.setTimerType(Qt::PreciseTimer); m_sendTimer.setSingleShot(false); // 设置为循环定时器 m_sendTimer.setInterval(10); // 每10毫秒发送一次数据 QEventLoop::connect(&m_sendTimer, &QTimer::timeout, this, &RTPOneRoadThread::do_timerSendData); m_sendTimer.start(); /* 将线程控制权交给Qt的事件循环 */ m_eventLoop.exec(); /* 清空数据 */ clearData(); SPDLOG_LOGGER_WARN(m_logger, "➢ {} RTP发送数据线程结束 ", m_logBase); } /* 初始化数据 */ bool RTPOneRoadThread::initData() { m_listUdpSockets.clear(); m_isRecvData.store(false); m_ringQueue.setQueueCapacity(512); m_ringQueue.setDefaultValue(nullptr); return true; } /* 清除数据*/ void RTPOneRoadThread::clearData() { m_isRecvData.store(false); // 设置为不接收数据状态 /* 清空UDP会话列表 */ std::lock_guard lock(m_lockUdpSockets); for(auto& session : m_listUdpSockets) { if(session.udpSocket != nullptr) { session.udpSocket->close(); delete session.udpSocket; session.udpSocket = nullptr; } } m_listUdpSockets.clear(); /* 清空环形队列 */ while(m_ringQueue.QueueSize() > 0) { auto data = m_ringQueue.front_pop_noBlock(); if(data != nullptr) { delete data; data = nullptr; } } } /** * @brief 发送数据,先获取一个缓冲区的数据,获取到后就遍历所有的UDP会话,将这个缓冲区所有的数据都发送出去 * 然后紧接着获取第二个缓冲区的数据,也一起发送出去 * * @return true * @return false */ bool RTPOneRoadThread::sendData() { while(m_ringQueue.QueueSize() > 0 && m_isRunning.load()) { auto data = m_ringQueue.front_pop_noBlock(); if(data == nullptr) { continue; } if(data->isEmpty()) { delete data; continue; } std::lock_guard lock(m_lockUdpSockets); /* 遍历所有的UDP会话,发送数据 */ for(const auto& session : m_listUdpSockets) { if(session.udpSocket == nullptr || !session.udpSocket->isValid()) { SPDLOG_LOGGER_WARN(m_logger, "{} 无效的UDP套接字: {}:{}", m_logBase, session.clientIP.toStdString(), session.clientPort); continue; } qint64 bytesSent = session.udpSocket->writeDatagram(data->pData, data->dataSize, QHostAddress(session.clientIP), session.clientPort); if(bytesSent == -1) { SPDLOG_LOGGER_ERROR(m_logger, "{} 发送数据失败: {}:{}", m_logBase, session.clientIP.toStdString(), session.clientPort); } else { SPDLOG_LOGGER_DEBUG(m_logger, "{} 发送数据成功: {}:{} 大小: {}", m_logBase, session.clientIP.toStdString(), session.clientPort, bytesSent); } } /* 发送完数据后,删除这个数据 */ delete data; data = nullptr; } return true; } /* 定时发送数据 */ void RTPOneRoadThread::do_timerSendData() { if(!m_isRecvData) { return; } /* 先初始化UDP */ /* 发送数据 */ sendData(); }