123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349 |
- #include "RtpOneRoadThread.h"
- #include "spdlog.h"
- RTPOneRoadThread::RTPOneRoadThread(RecordThreadInfo_t& threadInfo)
- : BaseRecordThread(threadInfo),
- m_eventLoop()
- {
- m_logger = spdlog::get("RTPServer");
- if(m_logger == nullptr)
- {
- fmt::print("RTPServer 日志记录器未初始化,请先初始化日志记录器");
- return;
- }
- m_logBase = fmt::format("录音通道: {}:{}", m_threadInfo.cardRoadInfo.strSoundCardName.toStdString(),
- m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
-
-
- }
- RTPOneRoadThread::~RTPOneRoadThread()
- {
- }
- /**
- * @brief 停止线程
- */
- void RTPOneRoadThread::stopThread()
- {
- if(m_sendTimer.isActive())
- {
- m_sendTimer.stop();
- }
- if(m_eventLoop.isRunning())
- {
- m_eventLoop.quit();
- m_eventLoop.processEvents(QEventLoop::AllEvents, 10);
- }
- while(m_isRunning.load())
- {
- std::this_thread::sleep_for(std::chrono::milliseconds(10));
- }
- }
- /* 设置数据 */
- bool RTPOneRoadThread::setData(const AudioSrcData& srcData)
- {
- /* UDP会话列表是空的,就不接收数据了 */
- if(!m_isRecvData.load())
- {
- return true;
- }
- AudioSrcData* data = new AudioSrcData(srcData);
- if(data == nullptr)
- {
- return false;
- }
- m_ringQueue.push(data);
- return false;
- }
- /* 添加一个UDP会话 */
- bool RTPOneRoadThread::addUdpSession(const RtpSendClientInfo_t& udpSession)
- {
- /* 检查是否已有相同的会话,这里只需要检查IP和端口即可,会话ID和名称在这里没什么用 */
- for(const auto& session : m_listClients)
- {
- if(session.clientIP == udpSession.clientIP && session.clientPort == udpSession.clientPort)
- {
- SPDLOG_LOGGER_DEBUG(m_logger, "{} 已存在相同的UDP会话: {}:{}", m_logBase, udpSession.clientIP.toStdString(), udpSession.clientPort);
- return false;
- }
- }
- SPDLOG_LOGGER_INFO(m_logger, "{} 添加UDP会话: {}:{}", m_logBase, udpSession.clientIP.toStdString(), udpSession.clientPort);
- /* 绑定UDP本地端口 */
- if(m_localPort < 0)
- {
- m_udpState = eUDPState::eUDP_Init;
- m_localIP = udpSession.localIP;
- m_localPort = udpSession.localPort;
- }
-
- m_lockUdpSockets.lock();
- m_listClients.append(udpSession);
- m_lockUdpSockets.unlock();
- return true;
- }
- /* 删除一个会话 */
- bool RTPOneRoadThread::removeUdpSession(QString clientIP, quint16 clientPort)
- {
- std::lock_guard<std::mutex> lock(m_lockUdpSockets);
- for(auto it = m_listClients.begin(); it != m_listClients.end(); ++it)
- {
- if(it->clientIP == clientIP && it->clientPort == clientPort)
- {
- // if(it->udpSocket != nullptr)
- // {
- // it->udpSocket->close();
- // delete it->udpSocket;
- // it->udpSocket = nullptr;
- // }
- SPDLOG_LOGGER_INFO(m_logger, "{} 删除UDP会话: {}:{}", m_logBase, it->clientIP.toStdString(), it->clientPort);
- // 从列表中删除该会话
- m_listClients.erase(it);
- break;
- }
- }
- /* 如果列表为空,就不再接收数据 */
- if(m_listClients.isEmpty())
- {
- /* 清空环形队列 */
- while(m_ringQueue.QueueSize() > 0)
- {
- auto data = m_ringQueue.front_pop_noBlock();
- if(data != nullptr)
- {
- delete data;
- data = nullptr;
- }
- }
- m_udpState = eUDPState::eUDP_Closed;
-
- SPDLOG_LOGGER_DEBUG(m_logger, "{} UDP会话列表为空,不再接收音频原始数据", m_logBase);
- }
- return true;
- }
- /* 发送数据的线程函数 */
- void RTPOneRoadThread::task()
- {
- SPDLOG_LOGGER_INFO(m_logger, "➢ {} 开启RTP发送数据线程 ", m_logBase);
- if(!initData())
- {
- SPDLOG_LOGGER_ERROR(m_logger, "{} 初始化数据失败", m_logBase);
- return;
- }
- m_sendTimer.setTimerType(Qt::PreciseTimer);
- m_sendTimer.setSingleShot(false); // 设置为循环定时器
- m_sendTimer.setInterval(10); // 每10毫秒发送一次数据
- QEventLoop::connect(&m_sendTimer, &QTimer::timeout, this, &RTPOneRoadThread::do_timerSendData);
- m_sendTimer.start();
- m_isRunning.store(true); // 设置为运行状态
- /* 将线程控制权交给Qt的事件循环 */
- m_eventLoop.exec();
- /* 清空数据 */
- clearData();
- m_isRunning.store(false);
- SPDLOG_LOGGER_WARN(m_logger, "➢ {} RTP发送数据线程结束 ", m_logBase);
- }
- /* 初始化数据 */
- bool RTPOneRoadThread::initData()
- {
- m_listClients.clear();
- m_isRecvData.store(false);
- m_ringQueue.setQueueCapacity(60);
- m_ringQueue.setDefaultValue(nullptr);
- return true;
- }
- /* 清除数据*/
- void RTPOneRoadThread::clearData()
- {
- m_isRecvData.store(false); // 设置为不接收数据状态
- /* 清空UDP会话列表 */
- std::lock_guard<std::mutex> lock(m_lockUdpSockets);
- // for(auto& session : m_listClients)
- // {
- // if(session.udpSocket != nullptr)
- // {
- // session.udpSocket->close();
- // delete session.udpSocket;
- // session.udpSocket = nullptr;
- // }
- // }
- m_listClients.clear();
-
- /* 清空环形队列 */
- while(m_ringQueue.QueueSize() > 0)
- {
- auto data = m_ringQueue.front_pop_noBlock();
- if(data != nullptr)
- {
- delete data;
- data = nullptr;
- }
- }
- }
- /* 处理UDP状态 */
- bool RTPOneRoadThread::processUdpState()
- {
- if(m_udpSocket == nullptr)
- {
- m_udpSocket = new QUdpSocket();
- if(m_udpSocket == nullptr)
- {
- SPDLOG_LOGGER_ERROR(m_logger, "{} 创建UDP套接字失败", m_logBase);
- return false;
- }
- m_udpSocket->setSocketOption(QAbstractSocket::LowDelayOption, 1); // 设置低延迟选项
- connect(m_udpSocket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(do_udpError(QAbstractSocket::SocketError)));
- }
- if(m_udpState == eUDPState::eUDP_Init)
- {
- if(!m_udpSocket->bind(QHostAddress(m_localIP)))
- {
- SPDLOG_LOGGER_ERROR(m_logger, "{} 绑定UDP套接字失败: {}:{}", m_logBase, m_localIP.toStdString(), m_udpSocket->localPort());
- return false;
- }
- SPDLOG_LOGGER_INFO(m_logger, "{} 绑定UDP套接字到本地IP: {}:{}", m_logBase,
- m_udpSocket->localAddress().toString().toStdString(), m_udpSocket->localPort());
- /* 设置为接收数据状态 */
- m_isRecvData.store(true);
- m_udpState = eUDPState::eUDP_Opened;
- }
- else if(m_udpState == eUDPState::eUDP_Closed)
- {
- /* 关闭接收 */
- m_isRecvData.store(false);
- /* 关闭UDP占用的本地端口 */
- if(m_udpSocket != nullptr)
- {
- m_udpSocket->close();
- }
- m_udpState = eUDPState::eUDP_None;
- /* 清空连接信息 */
- m_listClients.clear();
- m_localIP.clear();
- m_localPort = -1;
- emit signal_udpClosed(m_threadInfo.cardRoadInfo.nSoundCardNum,
- m_threadInfo.cardRoadInfo.roadInfo.nRoadNum, m_localPort);
-
- }
- return true;
- }
- /**
- * @brief 发送数据,先获取一个缓冲区的数据,获取到后就遍历所有的UDP会话,将这个缓冲区所有的数据都发送出去
- * 然后紧接着获取第二个缓冲区的数据,也一起发送出去
- *
- * @return true
- * @return false
- */
- bool RTPOneRoadThread::sendData()
- {
- while(m_ringQueue.QueueSize() > 0)
- {
- // SPDLOG_LOGGER_TRACE(m_logger, "{} 发送音频数据", m_logBase);
- auto data = m_ringQueue.front_pop_noBlock();
- if(data == nullptr)
- {
- continue;
- }
- if(data->isEmpty())
- {
- delete data;
- continue;
- }
- std::lock_guard<std::mutex> lock(m_lockUdpSockets);
- /* 遍历所有的UDP会话,发送数据 */
- for(const auto& session : m_listClients)
- {
- // if(session.udpSocket == nullptr || !session.udpSocket->isValid())
- // {
- // SPDLOG_LOGGER_WARN(m_logger, "{} 无效的UDP套接字: {}:{}", m_logBase, session.clientIP.toStdString(), session.clientPort);
- // continue;
- // }
- // qint64 bytesSent = session.udpSocket->writeDatagram(data->pData, data->dataSize,
- // QHostAddress(session.clientIP), session.clientPort);
- qint64 bytesSent = m_udpSocket->writeDatagram(QByteArray(reinterpret_cast<const char*>(data->pData), data->dataSize),
- QHostAddress(session.clientIP), session.clientPort);
- if(bytesSent == -1)
- {
- SPDLOG_LOGGER_ERROR(m_logger, "{} 发送数据失败: {}:{}", m_logBase, session.clientIP.toStdString(), session.clientPort);
- } else
- {
- SPDLOG_LOGGER_TRACE(m_logger, "{} 发送数据成功: {}:{}, 本地IP: {}:{}, 大小: {}", m_logBase,
- session.clientIP.toStdString(), session.clientPort,
- m_udpSocket->localAddress().toString().toStdString(), m_udpSocket->localPort(), bytesSent);
- }
- }
- /* 发送完数据后,删除这个数据 */
- delete data;
- data = nullptr;
- }
- return true;
- }
- /* 定时发送数据 */
- void RTPOneRoadThread::do_timerSendData()
- {
- if(m_udpState == eUDPState::eUDP_None)
- {
- return;
- }
-
- processUdpState();
-
-
- /* 发送数据 */
- sendData();
- }
- /* UDP错误槽函数 */
- void RTPOneRoadThread::do_udpError(QAbstractSocket::SocketError socketError)
- {
- auto senderSocket = qobject_cast<QUdpSocket*>(sender());
- // SPDLOG_LOGGER_ERROR(m_logger, "{} UDP套接字错误: {}", m_logBase, static_cast<int>(socketError));
- /* 这里可以根据需要处理不同的错误 */
- if(socketError == QAbstractSocket::SocketError::SocketTimeoutError)
- {
- SPDLOG_LOGGER_WARN(m_logger, "{} UDP连接超时", m_logBase);
- } else
- {
- SPDLOG_LOGGER_ERROR(m_logger, "{} UDP套接字发生错误: {}", m_logBase, static_cast<int>(socketError));
- }
- /* 获取UDP远程连接的IP和端口 */
- if(senderSocket != nullptr)
- {
- QString remoteIP = senderSocket->peerAddress().toString();
- quint16 remotePort = senderSocket->peerPort();
- /* 从队列中移除这个会话 */
- removeUdpSession(remoteIP, remotePort);
- }
- }
|