|
@@ -4,6 +4,7 @@
|
|
|
|
|
|
|
|
|
|
|
|
+
|
|
|
RTPOneRoadThread::RTPOneRoadThread(RecordThreadInfo_t& threadInfo)
|
|
|
: BaseRecordThread(threadInfo),
|
|
|
m_eventLoop()
|
|
@@ -17,7 +18,7 @@ RTPOneRoadThread::RTPOneRoadThread(RecordThreadInfo_t& threadInfo)
|
|
|
m_logBase = fmt::format("录音通道: {}:{}", m_threadInfo.cardRoadInfo.strSoundCardName.toStdString(),
|
|
|
m_threadInfo.cardRoadInfo.roadInfo.nRoadNum);
|
|
|
|
|
|
- m_isRunning.store(true); // 设置为运行状态
|
|
|
+
|
|
|
}
|
|
|
|
|
|
RTPOneRoadThread::~RTPOneRoadThread()
|
|
@@ -35,11 +36,15 @@ void RTPOneRoadThread::stopThread()
|
|
|
{
|
|
|
m_sendTimer.stop();
|
|
|
}
|
|
|
- while(m_eventLoop.isRunning())
|
|
|
+ if(m_eventLoop.isRunning())
|
|
|
{
|
|
|
m_eventLoop.quit();
|
|
|
m_eventLoop.processEvents(QEventLoop::AllEvents, 10);
|
|
|
}
|
|
|
+ while(m_isRunning.load())
|
|
|
+ {
|
|
|
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/* 设置数据 */
|
|
@@ -51,7 +56,6 @@ bool RTPOneRoadThread::setData(const AudioSrcData& srcData)
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
- /* 如果数据列表1被占用,就写入数据列表2 */
|
|
|
AudioSrcData* data = new AudioSrcData(srcData);
|
|
|
if(data == nullptr)
|
|
|
{
|
|
@@ -66,63 +70,54 @@ bool RTPOneRoadThread::setData(const AudioSrcData& srcData)
|
|
|
bool RTPOneRoadThread::addUdpSession(const RtpSendClientInfo_t& udpSession)
|
|
|
{
|
|
|
/* 检查是否已有相同的会话,这里只需要检查IP和端口即可,会话ID和名称在这里没什么用 */
|
|
|
- for(const auto& session : m_listUdpSockets)
|
|
|
+ for(const auto& session : m_listClients)
|
|
|
{
|
|
|
if(session.clientIP == udpSession.clientIP && session.clientPort == udpSession.clientPort)
|
|
|
{
|
|
|
SPDLOG_LOGGER_DEBUG(m_logger, "{} 已存在相同的UDP会话: {}:{}", m_logBase, udpSession.clientIP.toStdString(), udpSession.clientPort);
|
|
|
- return true;
|
|
|
+ return false;
|
|
|
}
|
|
|
}
|
|
|
+ SPDLOG_LOGGER_INFO(m_logger, "{} 添加UDP会话: {}:{}", m_logBase, udpSession.clientIP.toStdString(), udpSession.clientPort);
|
|
|
|
|
|
- /* 创建新的UDP套接字 */
|
|
|
- QUdpSocket* udpSocket = new QUdpSocket();
|
|
|
- /* 绑定本地IP和端口 */
|
|
|
- if(!udpSocket->bind(QHostAddress::Any, udpSession.localPort))
|
|
|
+ /* 绑定UDP本地端口 */
|
|
|
+ if(m_localPort < 0)
|
|
|
{
|
|
|
- SPDLOG_LOGGER_ERROR(m_logger, "{} 创建UDP套接字失败: {}:{}", m_logBase, udpSession.clientIP.toStdString(), udpSession.clientPort);
|
|
|
- delete udpSocket;
|
|
|
- return false;
|
|
|
+ m_udpState = eUDPState::eUDP_Init;
|
|
|
+ m_localIP = udpSession.localIP;
|
|
|
+ m_localPort = udpSession.localPort;
|
|
|
}
|
|
|
- SPDLOG_LOGGER_INFO(m_logger, "{} 添加UDP会话: {}:{}", m_logBase, udpSession.clientIP.toStdString(), udpSession.clientPort);
|
|
|
- // 将新的会话添加到列表中
|
|
|
- RtpSendClientInfo_t newSession = udpSession;
|
|
|
- newSession.udpSocket = udpSocket;
|
|
|
-
|
|
|
+
|
|
|
m_lockUdpSockets.lock();
|
|
|
- m_listUdpSockets.append(newSession);
|
|
|
+ m_listClients.append(udpSession);
|
|
|
m_lockUdpSockets.unlock();
|
|
|
-
|
|
|
- m_isRecvData.store(true); // 设置为接收数据状态
|
|
|
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
/* 删除一个会话 */
|
|
|
-bool RTPOneRoadThread::removeUdpSession(const RtpSendClientInfo_t& udpSession)
|
|
|
+bool RTPOneRoadThread::removeUdpSession(QString clientIP, quint16 clientPort)
|
|
|
{
|
|
|
- std::lock_guard<QMutex> lock(m_lockUdpSockets);
|
|
|
- for(auto it = m_listUdpSockets.begin(); it != m_listUdpSockets.end(); ++it)
|
|
|
+ std::lock_guard<std::mutex> lock(m_lockUdpSockets);
|
|
|
+ for(auto it = m_listClients.begin(); it != m_listClients.end(); ++it)
|
|
|
{
|
|
|
- if(it->clientIP == udpSession.clientIP && it->clientPort == udpSession.clientPort)
|
|
|
+ if(it->clientIP == clientIP && it->clientPort == clientPort)
|
|
|
{
|
|
|
- if(it->udpSocket != nullptr)
|
|
|
- {
|
|
|
- it->udpSocket->close();
|
|
|
- delete it->udpSocket;
|
|
|
- it->udpSocket = nullptr;
|
|
|
- }
|
|
|
+ // if(it->udpSocket != nullptr)
|
|
|
+ // {
|
|
|
+ // it->udpSocket->close();
|
|
|
+ // delete it->udpSocket;
|
|
|
+ // it->udpSocket = nullptr;
|
|
|
+ // }
|
|
|
SPDLOG_LOGGER_INFO(m_logger, "{} 删除UDP会话: {}:{}", m_logBase, it->clientIP.toStdString(), it->clientPort);
|
|
|
// 从列表中删除该会话
|
|
|
- m_listUdpSockets.erase(it);
|
|
|
+ m_listClients.erase(it);
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
/* 如果列表为空,就不再接收数据 */
|
|
|
- if(m_listUdpSockets.isEmpty())
|
|
|
+ if(m_listClients.isEmpty())
|
|
|
{
|
|
|
- m_isRecvData.store(false);
|
|
|
- SPDLOG_LOGGER_DEBUG(m_logger, "{} UDP会话列表为空,不再接收音频原始数据", m_logBase);
|
|
|
/* 清空环形队列 */
|
|
|
while(m_ringQueue.QueueSize() > 0)
|
|
|
{
|
|
@@ -133,6 +128,9 @@ bool RTPOneRoadThread::removeUdpSession(const RtpSendClientInfo_t& udpSession)
|
|
|
data = nullptr;
|
|
|
}
|
|
|
}
|
|
|
+ m_udpState = eUDPState::eUDP_Closed;
|
|
|
+
|
|
|
+ SPDLOG_LOGGER_DEBUG(m_logger, "{} UDP会话列表为空,不再接收音频原始数据", m_logBase);
|
|
|
}
|
|
|
|
|
|
return true;
|
|
@@ -153,10 +151,13 @@ void RTPOneRoadThread::task()
|
|
|
m_sendTimer.setInterval(10); // 每10毫秒发送一次数据
|
|
|
QEventLoop::connect(&m_sendTimer, &QTimer::timeout, this, &RTPOneRoadThread::do_timerSendData);
|
|
|
m_sendTimer.start();
|
|
|
+
|
|
|
+ m_isRunning.store(true); // 设置为运行状态
|
|
|
/* 将线程控制权交给Qt的事件循环 */
|
|
|
m_eventLoop.exec();
|
|
|
/* 清空数据 */
|
|
|
clearData();
|
|
|
+ m_isRunning.store(false);
|
|
|
SPDLOG_LOGGER_WARN(m_logger, "➢ {} RTP发送数据线程结束 ", m_logBase);
|
|
|
}
|
|
|
|
|
@@ -164,10 +165,10 @@ void RTPOneRoadThread::task()
|
|
|
/* 初始化数据 */
|
|
|
bool RTPOneRoadThread::initData()
|
|
|
{
|
|
|
- m_listUdpSockets.clear();
|
|
|
+ m_listClients.clear();
|
|
|
m_isRecvData.store(false);
|
|
|
|
|
|
- m_ringQueue.setQueueCapacity(512);
|
|
|
+ m_ringQueue.setQueueCapacity(60);
|
|
|
m_ringQueue.setDefaultValue(nullptr);
|
|
|
|
|
|
return true;
|
|
@@ -178,17 +179,17 @@ void RTPOneRoadThread::clearData()
|
|
|
{
|
|
|
m_isRecvData.store(false); // 设置为不接收数据状态
|
|
|
/* 清空UDP会话列表 */
|
|
|
- std::lock_guard<QMutex> lock(m_lockUdpSockets);
|
|
|
- for(auto& session : m_listUdpSockets)
|
|
|
- {
|
|
|
- if(session.udpSocket != nullptr)
|
|
|
- {
|
|
|
- session.udpSocket->close();
|
|
|
- delete session.udpSocket;
|
|
|
- session.udpSocket = nullptr;
|
|
|
- }
|
|
|
- }
|
|
|
- m_listUdpSockets.clear();
|
|
|
+ std::lock_guard<std::mutex> lock(m_lockUdpSockets);
|
|
|
+ // for(auto& session : m_listClients)
|
|
|
+ // {
|
|
|
+ // if(session.udpSocket != nullptr)
|
|
|
+ // {
|
|
|
+ // session.udpSocket->close();
|
|
|
+ // delete session.udpSocket;
|
|
|
+ // session.udpSocket = nullptr;
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ m_listClients.clear();
|
|
|
|
|
|
/* 清空环形队列 */
|
|
|
while(m_ringQueue.QueueSize() > 0)
|
|
@@ -202,6 +203,58 @@ void RTPOneRoadThread::clearData()
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+/* 处理UDP状态 */
|
|
|
+bool RTPOneRoadThread::processUdpState()
|
|
|
+{
|
|
|
+ if(m_udpSocket == nullptr)
|
|
|
+ {
|
|
|
+ m_udpSocket = new QUdpSocket();
|
|
|
+ if(m_udpSocket == nullptr)
|
|
|
+ {
|
|
|
+ SPDLOG_LOGGER_ERROR(m_logger, "{} 创建UDP套接字失败", m_logBase);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ m_udpSocket->setSocketOption(QAbstractSocket::LowDelayOption, 1); // 设置低延迟选项
|
|
|
+ connect(m_udpSocket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(do_udpError(QAbstractSocket::SocketError)));
|
|
|
+ }
|
|
|
+
|
|
|
+ if(m_udpState == eUDPState::eUDP_Init)
|
|
|
+ {
|
|
|
+ if(!m_udpSocket->bind(QHostAddress(m_localIP), m_localPort))
|
|
|
+ {
|
|
|
+ SPDLOG_LOGGER_ERROR(m_logger, "{} 绑定UDP套接字失败: {}:{}", m_logBase, m_localIP.toStdString(), m_localPort);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ SPDLOG_LOGGER_INFO(m_logger, "{} 绑定UDP套接字到本地IP: {}:{}", m_logBase,
|
|
|
+ m_udpSocket->localAddress().toString().toStdString(), m_udpSocket->localPort());
|
|
|
+ /* 设置为接收数据状态 */
|
|
|
+ m_isRecvData.store(true);
|
|
|
+ m_udpState = eUDPState::eUDP_Opened;
|
|
|
+ }
|
|
|
+ else if(m_udpState == eUDPState::eUDP_Closed)
|
|
|
+ {
|
|
|
+ /* 关闭接收 */
|
|
|
+ m_isRecvData.store(false);
|
|
|
+ /* 关闭UDP占用的本地端口 */
|
|
|
+ if(m_udpSocket != nullptr)
|
|
|
+ {
|
|
|
+ m_udpSocket->close();
|
|
|
+ }
|
|
|
+ m_udpState = eUDPState::eUDP_None;
|
|
|
+ /* 清空连接信息 */
|
|
|
+ m_listClients.clear();
|
|
|
+ m_localIP.clear();
|
|
|
+ m_localPort = -1;
|
|
|
+ emit signal_udpClosed(m_threadInfo.cardRoadInfo.nSoundCardNum,
|
|
|
+ m_threadInfo.cardRoadInfo.roadInfo.nRoadNum, m_localPort);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ return true;
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
* @brief 发送数据,先获取一个缓冲区的数据,获取到后就遍历所有的UDP会话,将这个缓冲区所有的数据都发送出去
|
|
|
* 然后紧接着获取第二个缓冲区的数据,也一起发送出去
|
|
@@ -211,8 +264,9 @@ void RTPOneRoadThread::clearData()
|
|
|
*/
|
|
|
bool RTPOneRoadThread::sendData()
|
|
|
{
|
|
|
- while(m_ringQueue.QueueSize() > 0 && m_isRunning.load())
|
|
|
+ while(m_ringQueue.QueueSize() > 0)
|
|
|
{
|
|
|
+ // SPDLOG_LOGGER_TRACE(m_logger, "{} 发送音频数据", m_logBase);
|
|
|
auto data = m_ringQueue.front_pop_noBlock();
|
|
|
if(data == nullptr)
|
|
|
{
|
|
@@ -223,25 +277,28 @@ bool RTPOneRoadThread::sendData()
|
|
|
delete data;
|
|
|
continue;
|
|
|
}
|
|
|
- std::lock_guard<QMutex> lock(m_lockUdpSockets);
|
|
|
+ std::lock_guard<std::mutex> lock(m_lockUdpSockets);
|
|
|
/* 遍历所有的UDP会话,发送数据 */
|
|
|
- for(const auto& session : m_listUdpSockets)
|
|
|
+ for(const auto& session : m_listClients)
|
|
|
{
|
|
|
- if(session.udpSocket == nullptr || !session.udpSocket->isValid())
|
|
|
- {
|
|
|
- SPDLOG_LOGGER_WARN(m_logger, "{} 无效的UDP套接字: {}:{}", m_logBase, session.clientIP.toStdString(), session.clientPort);
|
|
|
- continue;
|
|
|
- }
|
|
|
+ // if(session.udpSocket == nullptr || !session.udpSocket->isValid())
|
|
|
+ // {
|
|
|
+ // SPDLOG_LOGGER_WARN(m_logger, "{} 无效的UDP套接字: {}:{}", m_logBase, session.clientIP.toStdString(), session.clientPort);
|
|
|
+ // continue;
|
|
|
+ // }
|
|
|
|
|
|
- qint64 bytesSent = session.udpSocket->writeDatagram(data->pData, data->dataSize,
|
|
|
+ // qint64 bytesSent = session.udpSocket->writeDatagram(data->pData, data->dataSize,
|
|
|
+ // QHostAddress(session.clientIP), session.clientPort);
|
|
|
+ qint64 bytesSent = m_udpSocket->writeDatagram(QByteArray(reinterpret_cast<const char*>(data->pData), data->dataSize),
|
|
|
QHostAddress(session.clientIP), session.clientPort);
|
|
|
if(bytesSent == -1)
|
|
|
{
|
|
|
SPDLOG_LOGGER_ERROR(m_logger, "{} 发送数据失败: {}:{}", m_logBase, session.clientIP.toStdString(), session.clientPort);
|
|
|
- }
|
|
|
- else
|
|
|
+ } else
|
|
|
{
|
|
|
- SPDLOG_LOGGER_DEBUG(m_logger, "{} 发送数据成功: {}:{} 大小: {}", m_logBase, session.clientIP.toStdString(), session.clientPort, bytesSent);
|
|
|
+ SPDLOG_LOGGER_TRACE(m_logger, "{} 发送数据成功: {}:{}, 本地IP: {}:{}, 大小: {}", m_logBase,
|
|
|
+ session.clientIP.toStdString(), session.clientPort,
|
|
|
+ m_udpSocket->localAddress().toString().toStdString(), m_udpSocket->localPort(), bytesSent);
|
|
|
}
|
|
|
}
|
|
|
/* 发送完数据后,删除这个数据 */
|
|
@@ -255,13 +312,38 @@ bool RTPOneRoadThread::sendData()
|
|
|
/* 定时发送数据 */
|
|
|
void RTPOneRoadThread::do_timerSendData()
|
|
|
{
|
|
|
- if(!m_isRecvData)
|
|
|
+ if(m_udpState == eUDPState::eUDP_None)
|
|
|
{
|
|
|
return;
|
|
|
}
|
|
|
- /* 先初始化UDP */
|
|
|
-
|
|
|
+
|
|
|
+ processUdpState();
|
|
|
+
|
|
|
+
|
|
|
/* 发送数据 */
|
|
|
sendData();
|
|
|
}
|
|
|
|
|
|
+/* UDP错误槽函数 */
|
|
|
+void RTPOneRoadThread::do_udpError(QAbstractSocket::SocketError socketError)
|
|
|
+{
|
|
|
+ auto senderSocket = qobject_cast<QUdpSocket*>(sender());
|
|
|
+ // SPDLOG_LOGGER_ERROR(m_logger, "{} UDP套接字错误: {}", m_logBase, static_cast<int>(socketError));
|
|
|
+ /* 这里可以根据需要处理不同的错误 */
|
|
|
+ if(socketError == QAbstractSocket::SocketError::SocketTimeoutError)
|
|
|
+ {
|
|
|
+ SPDLOG_LOGGER_WARN(m_logger, "{} UDP连接超时", m_logBase);
|
|
|
+ } else
|
|
|
+ {
|
|
|
+ SPDLOG_LOGGER_ERROR(m_logger, "{} UDP套接字发生错误: {}", m_logBase, static_cast<int>(socketError));
|
|
|
+ }
|
|
|
+ /* 获取UDP远程连接的IP和端口 */
|
|
|
+ if(senderSocket != nullptr)
|
|
|
+ {
|
|
|
+ QString remoteIP = senderSocket->peerAddress().toString();
|
|
|
+ quint16 remotePort = senderSocket->peerPort();
|
|
|
+ /* 从队列中移除这个会话 */
|
|
|
+ removeUdpSession(remoteIP, remotePort);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|