#include "RtpOneRoadThread.h" #include "spdlog.h" RTPOneRoadThread::RTPOneRoadThread(RecordThreadInfo_t& threadInfo) : BaseRecordThread(threadInfo), m_eventLoop() { m_logger = spdlog::get("RTPServer"); if(m_logger == nullptr) { fmt::print("RTPServer 日志记录器未初始化,请先初始化日志记录器"); return; } m_logBase = fmt::format("录音通道: {}:{}", m_threadInfo.cardRoadInfo.strSoundCardName.toStdString(), m_threadInfo.cardRoadInfo.roadInfo.nRoadNum); } RTPOneRoadThread::~RTPOneRoadThread() { } /** * @brief 停止线程 */ void RTPOneRoadThread::stopThread() { if(m_sendTimer.isActive()) { m_sendTimer.stop(); } if(m_eventLoop.isRunning()) { m_eventLoop.quit(); m_eventLoop.processEvents(QEventLoop::AllEvents, 10); } while(m_isRunning.load()) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } /* 设置数据 */ bool RTPOneRoadThread::setData(const AudioSrcData& srcData) { /* UDP会话列表是空的,就不接收数据了 */ if(!m_isRecvData.load()) { return true; } AudioSrcData* data = new AudioSrcData(srcData); if(data == nullptr) { return false; } m_ringQueue.push(data); return false; } /* 添加一个UDP会话 */ bool RTPOneRoadThread::addUdpSession(const RtpSendClientInfo_t& udpSession) { /* 检查是否已有相同的会话,这里只需要检查IP和端口即可,会话ID和名称在这里没什么用 */ for(const auto& session : m_listClients) { if(session.clientIP == udpSession.clientIP && session.clientPort == udpSession.clientPort) { SPDLOG_LOGGER_DEBUG(m_logger, "{} 已存在相同的UDP会话: {}:{}", m_logBase, udpSession.clientIP.toStdString(), udpSession.clientPort); return false; } } SPDLOG_LOGGER_INFO(m_logger, "{} 添加UDP会话: {}:{}", m_logBase, udpSession.clientIP.toStdString(), udpSession.clientPort); /* 绑定UDP本地端口 */ if(m_localPort < 0) { m_udpState = eUDPState::eUDP_Init; m_localIP = udpSession.localIP; m_localPort = udpSession.localPort; } m_lockUdpSockets.lock(); m_listClients.append(udpSession); m_lockUdpSockets.unlock(); return true; } /* 删除一个会话 */ bool RTPOneRoadThread::removeUdpSession(QString clientIP, quint16 clientPort) { std::lock_guard lock(m_lockUdpSockets); for(auto it = m_listClients.begin(); it != m_listClients.end(); ++it) { if(it->clientIP == clientIP && it->clientPort == clientPort) { // if(it->udpSocket != nullptr) // { // it->udpSocket->close(); // delete it->udpSocket; // it->udpSocket = nullptr; // } SPDLOG_LOGGER_INFO(m_logger, "{} 删除UDP会话: {}:{}", m_logBase, it->clientIP.toStdString(), it->clientPort); // 从列表中删除该会话 m_listClients.erase(it); break; } } /* 如果列表为空,就不再接收数据 */ if(m_listClients.isEmpty()) { /* 清空环形队列 */ while(m_ringQueue.QueueSize() > 0) { auto data = m_ringQueue.front_pop_noBlock(); if(data != nullptr) { delete data; data = nullptr; } } m_udpState = eUDPState::eUDP_Closed; SPDLOG_LOGGER_DEBUG(m_logger, "{} UDP会话列表为空,不再接收音频原始数据", m_logBase); } return true; } /* 发送数据的线程函数 */ void RTPOneRoadThread::task() { SPDLOG_LOGGER_INFO(m_logger, "➢ {} 开启RTP发送数据线程 ", m_logBase); if(!initData()) { SPDLOG_LOGGER_ERROR(m_logger, "{} 初始化数据失败", m_logBase); return; } m_sendTimer.setTimerType(Qt::PreciseTimer); m_sendTimer.setSingleShot(false); // 设置为循环定时器 m_sendTimer.setInterval(10); // 每10毫秒发送一次数据 QEventLoop::connect(&m_sendTimer, &QTimer::timeout, this, &RTPOneRoadThread::do_timerSendData); m_sendTimer.start(); m_isRunning.store(true); // 设置为运行状态 /* 将线程控制权交给Qt的事件循环 */ m_eventLoop.exec(); /* 清空数据 */ clearData(); m_isRunning.store(false); SPDLOG_LOGGER_WARN(m_logger, "➢ {} RTP发送数据线程结束 ", m_logBase); } /* 初始化数据 */ bool RTPOneRoadThread::initData() { m_listClients.clear(); m_isRecvData.store(false); m_ringQueue.setQueueCapacity(60); m_ringQueue.setDefaultValue(nullptr); return true; } /* 清除数据*/ void RTPOneRoadThread::clearData() { m_isRecvData.store(false); // 设置为不接收数据状态 /* 清空UDP会话列表 */ std::lock_guard lock(m_lockUdpSockets); // for(auto& session : m_listClients) // { // if(session.udpSocket != nullptr) // { // session.udpSocket->close(); // delete session.udpSocket; // session.udpSocket = nullptr; // } // } m_listClients.clear(); /* 清空环形队列 */ while(m_ringQueue.QueueSize() > 0) { auto data = m_ringQueue.front_pop_noBlock(); if(data != nullptr) { delete data; data = nullptr; } } } /* 处理UDP状态 */ bool RTPOneRoadThread::processUdpState() { if(m_udpSocket == nullptr) { m_udpSocket = new QUdpSocket(); if(m_udpSocket == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "{} 创建UDP套接字失败", m_logBase); return false; } m_udpSocket->setSocketOption(QAbstractSocket::LowDelayOption, 1); // 设置低延迟选项 connect(m_udpSocket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(do_udpError(QAbstractSocket::SocketError))); } if(m_udpState == eUDPState::eUDP_Init) { if(!m_udpSocket->bind(QHostAddress(m_localIP))) { SPDLOG_LOGGER_ERROR(m_logger, "{} 绑定UDP套接字失败: {}:{}", m_logBase, m_localIP.toStdString(), m_udpSocket->localPort()); return false; } SPDLOG_LOGGER_INFO(m_logger, "{} 绑定UDP套接字到本地IP: {}:{}", m_logBase, m_udpSocket->localAddress().toString().toStdString(), m_udpSocket->localPort()); /* 设置为接收数据状态 */ m_isRecvData.store(true); m_udpState = eUDPState::eUDP_Opened; } else if(m_udpState == eUDPState::eUDP_Closed) { /* 关闭接收 */ m_isRecvData.store(false); /* 关闭UDP占用的本地端口 */ if(m_udpSocket != nullptr) { m_udpSocket->close(); } m_udpState = eUDPState::eUDP_None; /* 清空连接信息 */ m_listClients.clear(); m_localIP.clear(); m_localPort = -1; emit signal_udpClosed(m_threadInfo.cardRoadInfo.nSoundCardNum, m_threadInfo.cardRoadInfo.roadInfo.nRoadNum, m_localPort); } return true; } /** * @brief 发送数据,先获取一个缓冲区的数据,获取到后就遍历所有的UDP会话,将这个缓冲区所有的数据都发送出去 * 然后紧接着获取第二个缓冲区的数据,也一起发送出去 * * @return true * @return false */ bool RTPOneRoadThread::sendData() { while(m_ringQueue.QueueSize() > 0) { // SPDLOG_LOGGER_TRACE(m_logger, "{} 发送音频数据", m_logBase); auto data = m_ringQueue.front_pop_noBlock(); if(data == nullptr) { continue; } if(data->isEmpty()) { delete data; continue; } std::lock_guard lock(m_lockUdpSockets); /* 遍历所有的UDP会话,发送数据 */ for(const auto& session : m_listClients) { // if(session.udpSocket == nullptr || !session.udpSocket->isValid()) // { // SPDLOG_LOGGER_WARN(m_logger, "{} 无效的UDP套接字: {}:{}", m_logBase, session.clientIP.toStdString(), session.clientPort); // continue; // } // qint64 bytesSent = session.udpSocket->writeDatagram(data->pData, data->dataSize, // QHostAddress(session.clientIP), session.clientPort); qint64 bytesSent = m_udpSocket->writeDatagram(QByteArray(reinterpret_cast(data->pData), data->dataSize), QHostAddress(session.clientIP), session.clientPort); if(bytesSent == -1) { SPDLOG_LOGGER_ERROR(m_logger, "{} 发送数据失败: {}:{}", m_logBase, session.clientIP.toStdString(), session.clientPort); } else { SPDLOG_LOGGER_TRACE(m_logger, "{} 发送数据成功: {}:{}, 本地IP: {}:{}, 大小: {}", m_logBase, session.clientIP.toStdString(), session.clientPort, m_udpSocket->localAddress().toString().toStdString(), m_udpSocket->localPort(), bytesSent); } } /* 发送完数据后,删除这个数据 */ delete data; data = nullptr; } return true; } /* 定时发送数据 */ void RTPOneRoadThread::do_timerSendData() { if(m_udpState == eUDPState::eUDP_None) { return; } processUdpState(); /* 发送数据 */ sendData(); } /* UDP错误槽函数 */ void RTPOneRoadThread::do_udpError(QAbstractSocket::SocketError socketError) { auto senderSocket = qobject_cast(sender()); // SPDLOG_LOGGER_ERROR(m_logger, "{} UDP套接字错误: {}", m_logBase, static_cast(socketError)); /* 这里可以根据需要处理不同的错误 */ if(socketError == QAbstractSocket::SocketError::SocketTimeoutError) { SPDLOG_LOGGER_WARN(m_logger, "{} UDP连接超时", m_logBase); } else { SPDLOG_LOGGER_ERROR(m_logger, "{} UDP套接字发生错误: {}", m_logBase, static_cast(socketError)); } /* 获取UDP远程连接的IP和端口 */ if(senderSocket != nullptr) { QString remoteIP = senderSocket->peerAddress().toString(); quint16 remotePort = senderSocket->peerPort(); /* 从队列中移除这个会话 */ removeUdpSession(remoteIP, remotePort); } }