#include "ThreadManager.h" #include "CreateWAVThread.h" #include "CreateDBThread.h" #include "ConsistencyCompareThread.h" #include "GlobalVariable.h" #include "NoiseDetectThread.h" #include "RtpOneRoadThread.h" #include "CreateLongFileThread.h" #include "RecordThread.h" #include "AssignSrcDataThread.h" #include "ThreadPool.h" #include ThreadManager::ThreadManager() { m_logger = spdlog::get("ACAServer"); if(m_logger == nullptr) { fmt::print("ThreadManager: ACAServer Logger not found.\n"); return; } m_logBase = "ThreadManager"; } /* 启动所有线程 */ // void ThreadManager::startAllThreads() // { // } /* 停止所有线程 */ void ThreadManager::stopAllThreads() { } /* 创建一个录音通道及其附属的线程 */ bool ThreadManager::createRecordThread(const SoundCardRoadInfo_t& roadInfo, int compareItemID) { /* 先查找队列中有没有该录音通道 */ std::lock_guard lock(m_mutexRecordThreadRefCount); for(const auto& pair : m_mapRecordThreadRefCount) { if( pair.first.nSoundCardNum == roadInfo.nSoundCardNum && pair.first.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum) { SPDLOG_LOGGER_TRACE(m_logger, "{}:{} 录音线程已存在", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum); /* 录音线程已存在,增加引用计数 先判断对比项ID是否已经在列表中了 */ bool idFound = false; for(const auto& it : pair.second) { if(it == compareItemID) { idFound = true; break; } } if(!idFound) { /* 如果对比项ID不存在,则添加到引用计数中 */ m_mapRecordThreadRefCount[pair.first].push_back(compareItemID); } SPDLOG_LOGGER_INFO(m_logger, "{}:{} 录音线程引用计数增加,当前计数: {}", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum, m_mapRecordThreadRefCount[pair.first].size()); /* 返回成功 */ return true; } } /* 录音线程不存在,挨个创建线程 */ /* 先创建生成wav小文件数据的线程 */ RecordThreadInfo_t threadInfo; threadInfo.cardRoadInfo = roadInfo; threadInfo.threadState = EThreadState::State_Inited; threadInfo.threadType = EThreadType::Type_CreateWAV; CreateWAVThread* pCreateWAVThread = new CreateWAVThread(threadInfo); if(pCreateWAVThread == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "{}:{} 创建生成wav小文件线程失败", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum); // return false; // 创建失败 }else { CPPTP.add_task(&CreateWAVThread::threadTask, pCreateWAVThread); std::lock_guard lock(m_mutexCreateWAVThreads); m_createWAVThreads.push_back(pCreateWAVThread); } /* 创建计算音量的线程 */ threadInfo.threadType = EThreadType::Type_CreateDB; CreateDBThread* pCreateDBThread = new CreateDBThread(threadInfo); if(pCreateDBThread == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "{}:{} 创建计算音量线程失败", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum); // return false; // 创建失败 }else { CPPTP.add_task(&CreateDBThread::threadTask, pCreateDBThread); std::lock_guard lock(m_mutexCreateDBThreads); m_createDBThreads.push_back(pCreateDBThread); } /* 创建生成长文件的线程 */ threadInfo.threadType = EThreadType::Type_CreateLongWAV; CreateLongFileThread* pCreateLongWAVThread = new CreateLongFileThread(threadInfo); if(pCreateLongWAVThread == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "{}:{} 创建生成长文件线程失败", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum); // return false; // 创建失败 }else { CPPTP.add_task(&CreateLongFileThread::threadTask, pCreateLongWAVThread); std::lock_guard lock(m_mutexCreateLongWAVThreads); m_createLongWAVThreads.push_back(pCreateLongWAVThread); } /* 创建发送RTP数据的线程 */ threadInfo.threadType = EThreadType::Type_RtpSend; CPPTP.add_task(&ThreadManager::thread_RTPSend, threadInfo); RTPOneRoadThread* pRtpSendThread = new RTPOneRoadThread(threadInfo); if(pRtpSendThread == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "{}:{} 创建发送RTP数据线程失败", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum); }else { CPPTP.add_task(&RTPOneRoadThread::threadTask, pRtpSendThread); std::lock_guard lock(m_mutexRtpSendThreads); m_rtpSendThreads.push_back(pRtpSendThread); } /* 创建录音线程 */ threadInfo.threadType = EThreadType::Type_RecordSrc; RecordThread* pRecordThread = new RecordThread(threadInfo); if(pRecordThread == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "{}:{} 创建录音线程失败", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum); // return false; // 创建失败 }else { CPPTP.add_task(&RecordThread::threadTask, pRecordThread); std::lock_guard lock(m_mutexRecordThreads); m_recordThreads.push_back(pRecordThread); } /* 创建分派数据线程 */ threadInfo.threadType = EThreadType::Type_AssignSrcData; AssignSrcDataThread* pAssignSrcDataThread = new AssignSrcDataThread(threadInfo); if(pAssignSrcDataThread == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "{}:{} 创建分派数据线程失败", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum); // return false; // 创建失败 }else { CPPTP.add_task(&AssignSrcDataThread::threadTask, pAssignSrcDataThread); std::lock_guard lock(m_mutexAssignSrcDataThreads); m_assignSrcDataThreads.push_back(pAssignSrcDataThread); } /* 录音线程创建成功,增加引用计数 */ m_mapRecordThreadRefCount[roadInfo].push_back(compareItemID); return true; } /** 销毁一个录音通道及其附属的线程,如果引用计数为0, 这里只停止该录音通道的所有线程,不会删除实例 线程实例会由其他管理线程定期去删除 */ bool ThreadManager::removeRecordThread(const SoundCardRoadInfo_t& roadInfo, int compareItemID) { std::lock_guard lock(m_mutexRecordThreadRefCount); /* 先查找这个引用计数是否存在 */ int refCount = 0; for(auto& pair : m_mapRecordThreadRefCount) { if(pair.first.nSoundCardNum == roadInfo.nSoundCardNum && pair.first.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum) { /* 找到该引用计数,减少引用计数 */ for(auto& it : pair.second) { if(it == compareItemID) { m_mapRecordThreadRefCount[pair.first].remove(it); break; } } refCount = m_mapRecordThreadRefCount[pair.first].size(); break; } } if(refCount > 0) { SPDLOG_LOGGER_INFO(m_logger, "{}:{} 录音线程引用计数减少,当前计数: {}", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum, refCount); return true; } SPDLOG_LOGGER_INFO(m_logger, "{}:{} 录音线程引用计数为0,即将停止该录音通道的所有线程", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum); /* 引用计数为0,停止该录音通道的所有线程 */ /* 停止录音线程 */ { std::lock_guard lock(m_mutexRecordThreads); for(auto it : m_recordThreads) { const auto& threadInfo = it->getThreadInfo(); if(threadInfo.cardRoadInfo.nSoundCardNum == roadInfo.nSoundCardNum && threadInfo.cardRoadInfo.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum) { it->stopThread(); SPDLOG_LOGGER_TRACE(m_logger, "{}:{} 录音线程已停止", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum); break; } } } /* 停止分派任务线程 */ { std::lock_guard lock(m_mutexAssignSrcDataThreads); for(auto it : m_assignSrcDataThreads) { const auto& threadInfo = it->getThreadInfo(); if(threadInfo.cardRoadInfo.nSoundCardNum == roadInfo.nSoundCardNum && threadInfo.cardRoadInfo.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum) { it->stopThread(); SPDLOG_LOGGER_TRACE(m_logger, "{}:{} 分派数据线程已停止", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum); break; } } } /* 停止生成wav小文件线程 */ { std::lock_guard lock(m_mutexCreateWAVThreads); for(auto it : m_createWAVThreads) { const auto& threadInfo = it->getThreadInfo(); if(threadInfo.cardRoadInfo.nSoundCardNum == roadInfo.nSoundCardNum && threadInfo.cardRoadInfo.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum) { it->stopThread(); SPDLOG_LOGGER_TRACE(m_logger, "{}:{} 生成wav小文件线程已停止", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum); break; } } } /* 停止计算音量线程 */ { std::lock_guard lock(m_mutexCreateDBThreads); for(auto it : m_createDBThreads) { const auto& threadInfo = it->getThreadInfo(); if(threadInfo.cardRoadInfo.nSoundCardNum == roadInfo.nSoundCardNum && threadInfo.cardRoadInfo.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum) { it->stopThread(); SPDLOG_LOGGER_TRACE(m_logger, "{}:{} 计算音量线程已停止", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum); break; } } } /* 停止生成长文件线程 */ { std::lock_guard lock(m_mutexCreateLongWAVThreads); for(auto it : m_createLongWAVThreads) { const auto& threadInfo = it->getThreadInfo(); if(threadInfo.cardRoadInfo.nSoundCardNum == roadInfo.nSoundCardNum && threadInfo.cardRoadInfo.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum) { it->stopThread(); SPDLOG_LOGGER_TRACE(m_logger, "{}:{} 生成长文件线程已停止", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum); break; } } } /* 停止发送RTP数据线程 */ { std::lock_guard lock(m_mutexRtpSendThreads); for(auto it : m_rtpSendThreads) { const auto& threadInfo = it->getThreadInfo(); if(threadInfo.cardRoadInfo.nSoundCardNum == roadInfo.nSoundCardNum && threadInfo.cardRoadInfo.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum) { it->stopThread(); SPDLOG_LOGGER_TRACE(m_logger, "{}:{} 发送RTP数据线程已停止", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum); break; } } } /* 从引用计数中移除该录音通道 */ auto it = m_mapRecordThreadRefCount.find(roadInfo); if(it != m_mapRecordThreadRefCount.end()) { m_mapRecordThreadRefCount.erase(it); SPDLOG_LOGGER_INFO(m_logger, "{}:{} 录音线程已停止运行", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum); } else { SPDLOG_LOGGER_WARN(m_logger, "{}:{} 录音线程引用计数未找到", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum); } // 设置销毁录音线程标志 m_isDestroyeRecordThread.store(true); return true; } /* 销毁录音线程 */ void ThreadManager::destroyeRecordThread() { if(!m_isDestroyeRecordThread.load()) { /* 没有需要销毁的线程 */ return; } /* 销毁录音线程 */ { std::lock_guard lock(m_mutexRecordThreads); for(auto it = m_recordThreads.begin(); it != m_recordThreads.end(); ) { BaseRecordThread* pThread = *it; if(pThread != nullptr) { if(pThread->getThreadInfo().threadState == EThreadState::State_Running) { auto pRecordThread = dynamic_cast(pThread); delete pRecordThread; // 删除线程 pThread = nullptr; it = m_recordThreads.erase(it); // 从列表中移除 } } else { it++; } } } /* 销毁分派线程 */ { std::lock_guard lock(m_mutexAssignSrcDataThreads); for(auto it = m_assignSrcDataThreads.begin(); it != m_assignSrcDataThreads.end(); ) { BaseRecordThread* pThread = *it; if(pThread != nullptr) { if(pThread->getThreadInfo().threadState == EThreadState::State_Running) { auto pAssignSrcDataThread = dynamic_cast(pThread); delete pAssignSrcDataThread; // 删除线程 pThread = nullptr; it = m_assignSrcDataThreads.erase(it); // 从列表中移除 } } else { it++; } } } /* 销毁生成wav小文件线程 */ { std::lock_guard lock(m_mutexCreateWAVThreads); for(auto it = m_createWAVThreads.begin(); it != m_createWAVThreads.end(); ) { BaseRecordThread* pThread = *it; if(pThread != nullptr) { if(pThread->getThreadInfo().threadState == EThreadState::State_Running) { auto pCreateWAVThread = dynamic_cast(pThread); delete pCreateWAVThread; // 删除线程 pThread = nullptr; it = m_createWAVThreads.erase(it); // 从列表中移除 } } else { it++; } } } /* 销毁计算音量线程 */ { std::lock_guard lock(m_mutexCreateDBThreads); for(auto it = m_createDBThreads.begin(); it != m_createDBThreads.end(); ) { BaseRecordThread* pThread = *it; if(pThread != nullptr) { if(pThread->getThreadInfo().threadState == EThreadState::State_Running) { auto pCreateDBThread = dynamic_cast(pThread); delete pCreateDBThread; // 删除线程 pThread = nullptr; it = m_createDBThreads.erase(it); // 从列表中移除 } } else { it++; } } } /* 销毁生成长文件线程 */ { std::lock_guard lock(m_mutexCreateLongWAVThreads); for(auto it = m_createLongWAVThreads.begin(); it != m_createLongWAVThreads.end(); ) { BaseRecordThread* pThread = *it; if(pThread != nullptr) { if(pThread->getThreadInfo().threadState == EThreadState::State_Running) { auto pCreateLongWAVThread = dynamic_cast(pThread); delete pCreateLongWAVThread; // 删除线程 pThread = nullptr; it = m_createLongWAVThreads.erase(it); // 从列表中移除 } } else { it++; } } } /* 销毁发送RTP数据线程 */ { std::lock_guard lock(m_mutexRtpSendThreads); for(auto it = m_rtpSendThreads.begin(); it != m_rtpSendThreads.end(); ) { BaseRecordThread* pThread = *it; if(pThread != nullptr) { if(pThread->getThreadInfo().threadState == EThreadState::State_Running) { auto pRtpSendThread = dynamic_cast(pThread); delete pRtpSendThread; // 删除线程 pThread = nullptr; it = m_rtpSendThreads.erase(it); // 从列表中移除 } } else { it++; } } } /* 销毁标志位置为false */ m_isDestroyeRecordThread.store(false); } /* 查找录音线程 */ BaseRecordThread* ThreadManager::findRecordThread(EThreadType type, int cardID, int recordID) { switch(type) { case EThreadType::Type_RecordSrc: /* 录音线程 */ { std::lock_guard lock(m_mutexRecordThreads); for (auto& pThread : m_recordThreads) { if (pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID && pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID) { return pThread; } } } break; case EThreadType::Type_CreateWAV: /* 创建wav小文件和分离左右声道的线程 */ { std::lock_guard lock(m_mutexCreateWAVThreads); for (auto& pThread : m_createWAVThreads) { if (pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID && pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID) { return pThread; } } } break; case EThreadType::Type_CreateDB: /* 计算音量和反相的线程 */ { std::lock_guard lock(m_mutexCreateDBThreads); for (auto& pThread : m_createDBThreads) { if (pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID && pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID) { return pThread; } } } break; case EThreadType::Type_CreateLongWAV: /* 创建长文件的线程 */ { std::lock_guard lock(m_mutexCreateLongWAVThreads); for (auto& pThread : m_createLongWAVThreads) { if (pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID && pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID) { return pThread; } } } break; case EThreadType::Type_AssignSrcData: /* 分派数据线程 */ { std::lock_guard lock(m_mutexAssignSrcDataThreads); for (auto& pThread : m_assignSrcDataThreads) { if (pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID && pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID) { return pThread; } } } break; case EThreadType::Type_RtpSend: /* RTP发送线程 */ { std::lock_guard lock(m_mutexRtpSendThreads); for (auto& pThread : m_rtpSendThreads) { if (pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID && pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID) { return pThread; } } } break; default: SPDLOG_LOGGER_ERROR(m_logger, "{} 查找录音线程失败,未知线程类型: {}", m_logBase, static_cast(type)); return nullptr; // 未知线程类型 } return nullptr; } /* 获取创建WAV线程指针 */ CreateWAVThread* ThreadManager::getCreateWAVThread(int cardID, int recordID) { std::lock_guard lock(m_mutexCreateWAVThreads); for(auto& pThread : m_createWAVThreads) { if(pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID && pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID) { return dynamic_cast(pThread); } } return nullptr; } /* 获取创建音量值的线程 */ CreateDBThread* ThreadManager::getCreateDBThread(int cardID, int recordID) { std::lock_guard lock(m_mutexCreateDBThreads); for(auto& pThread : m_createDBThreads) { if(pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID && pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID) { return dynamic_cast(pThread); } } return nullptr; } /* 获取发送Rtp数据的线程 */ RTPOneRoadThread* ThreadManager::getRtpSendThread(int cardID, int recordID) { std::lock_guard lock(m_mutexRtpSendThreads); for(auto& pThread : m_rtpSendThreads) { if(pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID && pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID) { return dynamic_cast(pThread); } } return nullptr; } /* 获取录制报警文件的线程 */ CreateLongFileThread* ThreadManager::getCreateLongFileThread(int cardID, int recordID) { std::lock_guard lock(m_mutexCreateLongWAVThreads); for(auto& pThread : m_createLongWAVThreads) { if(pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID && pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID) { return dynamic_cast(pThread); } } return nullptr; } /* ------------------------------------------------------------------------------------------- * 获取计算线程,如果该线程不存在则创建该线程 * 当不需要此线程后,调用remove()函数去掉该线程 * -------------------------------------------------------------------------------------------- */ /* 获取一致性比对线程,线程不存在则创建 */ // ConsistencyCompareThread* ThreadManager::getConsistencyCompareThread(const SoundCardRoadInfo_t& roadInfo1, const SoundCardRoadInfo_t& roadInfo2) // { // std::lock_guard lock(m_mutexConsistencyCompareThreads); // for(const auto pThread : m_listConsistencyCompareThreads) // { // if(pThread->isRoadEqual(roadInfo1, roadInfo2)) // { // return pThread; // 找到相同的线程,直接返回 // } // } // /* 没找到该线程,创建新的线程 */ // CompareItemRoadInfo_t item1; // item1.nCompareRoadNum = 1; // item1.scRoadInfo = roadInfo1; // CompareItemRoadInfo_t item2; // item2.nCompareRoadNum = 2; // item2.scRoadInfo = roadInfo2; // CalculateThreadInfo_t threadInfo; // threadInfo.compareItemInfo.mapRoad.insert(item1.nCompareRoadNum, item1); // threadInfo.compareItemInfo.mapRoad.insert(item2.nCompareRoadNum, item2); // ConsistencyCompareThread* newThread = new ConsistencyCompareThread(threadInfo); // if(newThread == nullptr) // { // SPDLOG_LOGGER_ERROR(m_logger, "创建一致性比对线程失败"); // return nullptr; // 创建失败 // } // CPPTP.add_task(&ConsistencyCompareThread::threadTask, newThread); // m_listConsistencyCompareThreads.push_back(newThread); // m_referCountConsistencyCompare++; // 引用计数加一 // return newThread; // } /* 去掉线程,线程使用的计数减一,计数为0则销毁该线程 */ // bool ThreadManager::removeConsistencyCompareThread(SoundCardRoadInfo_t& roadInfo1, SoundCardRoadInfo_t& roadInfo2) // { // std::lock_guard lock(m_mutexConsistencyCompareThreads); // ConsistencyCompareThread* pThreadToRemove = nullptr; // for(const auto pThread : m_listConsistencyCompareThreads) // { // if(pThread->isRoadEqual(roadInfo1, roadInfo2)) // { // pThreadToRemove = pThread; // 找到相同的线程,直接返回 // break; // } // } // if(pThreadToRemove == nullptr) // { // SPDLOG_LOGGER_WARN(m_logger, "{}:{} - {}:{} 一致性比对线程未找到", roadInfo1.strSoundCardName.toStdString(), roadInfo1.roadInfo.nRoadNum, // roadInfo2.strSoundCardName.toStdString(), roadInfo2.roadInfo.nRoadNum); // return false; // 没找到该线程 // } // m_referCountConsistencyCompare--; // 引用计数减一 // if(m_referCountConsistencyCompare <= 0) // { // /* 停止线程,并一直等待其停止 */ // pThreadToRemove->stopThreadBlock(); // m_listConsistencyCompareThreads.remove(pThreadToRemove); // 从列表中移除 // delete pThreadToRemove; // 删除线程 // pThreadToRemove = nullptr; // m_referCountConsistencyCompare = 0; // 重置引用计数 // SPDLOG_LOGGER_WARN(m_logger, "{}:{} - {}:{} 一致性比对线程已销毁", roadInfo1.strSoundCardName.toStdString(), roadInfo1.roadInfo.nRoadNum, // roadInfo2.strSoundCardName.toStdString(), roadInfo2.roadInfo.nRoadNum); // } // return true; // } /* 获取噪音检测线程 */ NoiseDetectThread* ThreadManager::getNoiseDetectThread(const SoundCardRoadInfo_t& roadInfo, int compareItemID) { std::lock_guard lock(m_mutexNoiseDetectThreads); NoiseDetectThread* pNoiseDetectThread = nullptr; for(const auto pThread : m_listNoiseDetectThreads) { const SoundCardRoadInfo_t& threadRoadInfo = pThread->getRoadInfo(); if(threadRoadInfo.nSoundCardNum == roadInfo.nSoundCardNum && threadRoadInfo.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum) { pNoiseDetectThread = pThread; break; } } /* 判断引用计数是否需要增加 */ if(pNoiseDetectThread != nullptr) { /* 查找这个通道是否在列表中(实际上肯定在列表中) */ bool isExist = false; for(auto it = m_mapNoiseDetectThreadRefCount.begin(); it != m_mapNoiseDetectThreadRefCount.end(); ++it) { if(it->first.nSoundCardNum == roadInfo.nSoundCardNum && it->first.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum) { bool isFound = false; for(auto& id : it->second) { if(id == compareItemID) { isFound = true; break; // 找到相同的对比项ID,直接返回 } } if(!isFound) { it->second.push_back(compareItemID); // 添加新的对比项ID SPDLOG_LOGGER_TRACE(m_logger, "{}:{} 噪音检测线程引用计数增加,当前计数: {}", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum, it->second.size()); } isExist = true; break; } } if(!isExist) { /* 不在引用计数的列表中,添加进入 */ m_mapNoiseDetectThreadRefCount[roadInfo].push_back(compareItemID); } return pNoiseDetectThread; } /* 没找到该线程,创建新的线程 */ CalculateThreadInfo_t threadInfo; CompareItemRoadInfo_t item; item.nCompareRoadNum = 1; // 假设噪音检测线程 item.scRoadInfo = roadInfo; threadInfo.compareItemInfo.mapRoad.insert(item.nCompareRoadNum, item); NoiseDetectThread* newThread = new NoiseDetectThread(threadInfo); if(newThread == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "创建噪音检测线程失败"); return nullptr; // 创建失败 } CPPTP.add_task(&NoiseDetectThread::threadTask, newThread); m_listNoiseDetectThreads.push_back(newThread); /* 不在引用计数的列表中,添加进入 */ m_mapNoiseDetectThreadRefCount[roadInfo].push_back(compareItemID); return newThread; } /* 去掉噪音检测线程,线程使用的计数减一,计数为0则销毁该线程 */ bool ThreadManager::removeNoiseDetectThread(const SoundCardRoadInfo_t& roadInfo, int compareItemID) { std::lock_guard lock(m_mutexNoiseDetectThreads); NoiseDetectThread* pThreadToRemove = nullptr; for(const auto pThread : m_listNoiseDetectThreads) { const SoundCardRoadInfo_t& threadRoadInfo = pThread->getRoadInfo(); if(threadRoadInfo.nSoundCardNum == roadInfo.nSoundCardNum && threadRoadInfo.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum) { pThreadToRemove = pThread; // 找到相同的线程,直接返回 break; } } if(pThreadToRemove == nullptr) { SPDLOG_LOGGER_WARN(m_logger, "{}:{} 噪音检测线程未找到", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum); return false; // 没找到该线程 } /* 引用计数减一 */ int useCount = 0; for(auto it = m_mapNoiseDetectThreadRefCount.begin(); it != m_mapNoiseDetectThreadRefCount.end(); ++it) { if(it->first.nSoundCardNum == roadInfo.nSoundCardNum && it->first.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum) { /* 将对比项ID从列表中删除 */ auto& compareItemList = it->second; auto itemIt = std::find(compareItemList.begin(), compareItemList.end(), compareItemID); if(itemIt != compareItemList.end()) { compareItemList.erase(itemIt); // 移除该对比项ID SPDLOG_LOGGER_TRACE(m_logger, "{}:{} 噪音检测线程引用计数减少,当前计数: {}", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum, compareItemList.size()); } useCount = compareItemList.size(); // 获取当前引用计数 if(useCount <= 0) { m_mapNoiseDetectThreadRefCount.erase(it); // 如果引用计数为0,则从列表中移除 } break; // 找到后退出循环 } } if(useCount <= 0) { SPDLOG_LOGGER_INFO(m_logger, "{}:{} 噪音检测线程引用计数为0,准备销毁该线程", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum); pThreadToRemove->stopThreadBlock(); // 停止线程 m_listNoiseDetectThreads.remove(pThreadToRemove); // 从列表中移除 delete pThreadToRemove; // 删除线程 pThreadToRemove = nullptr; SPDLOG_LOGGER_INFO(m_logger, "{}:{} 噪音检测线程已销毁", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum); } return true; } // /* 获取音量报警线程 */ // CalculateDBPhaseThread* ThreadManager::getCalculateDBPhaseThread(const SoundCardRoadInfo_t& roadInfo) // { // std::lock_guard lock(m_mutexCalculateDBPhaseThreads); // for(const auto pThread : m_listCalculateDBPhaseThreads) // { // const SoundCardRoadInfo_t& threadRoadInfo = pThread->getRoadInfo(); // if(threadRoadInfo.nSoundCardNum == roadInfo.nSoundCardNum && // threadRoadInfo.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum) // { // return pThread; // 找到相同的线程,直接返回 // } // } // /* 没找到该线程,创建新的线程 */ // CompareItemRoadInfo_t item; // item.nCompareRoadNum = 1; // 假设音量报警线程 // item.scRoadInfo = roadInfo; // threadInfo.compareItemInfo.mapRoad.insert(item.nCompareRoadNum, item); // CalculateDBPhaseThread* newThread = new CalculateDBPhaseThread(threadInfo); // if(newThread == nullptr) // { // SPDLOG_LOGGER_ERROR(m_logger, "创建音量报警线程失败"); // return nullptr; // 创建失败 // } // CPPTP.add_task(&CalculateDBPhaseThread::threadTask, newThread); // m_listCalculateDBPhaseThreads.push_back(newThread); // m_referCountCalculateDBPhase++; // 引用计数加一 // return newThread; // } // /* 去掉音量报警线程,线程使用的计数减一,计数为0则销毁该线程 */ // bool ThreadManager::removeCalculateDBPhaseThread(RoadNumberInfo_t& roadInfo) // { // std::lock_guard lock(m_mutexCalculateDBPhaseThreads); // CalculateDBPhaseThread* pThreadToRemove = nullptr; // for(const auto pThread : m_listCalculateDBPhaseThreads) // { // if(pThread->getRoadInfo().roadID == roadInfo.roadID) // { // pThreadToRemove = pThread; // 找到相同的线程,直接返回 // break; // } // } // if(pThreadToRemove == nullptr) // { // SPDLOG_LOGGER_WARN(m_logger, "{} 音量报警线程未找到", roadInfo.strRoadName); // return false; // 没找到该线程 // } // m_referCountCalculateDBPhase--; // 引用计数减一 // if(m_referCountCalculateDBPhase <= 0) // { // pThreadToRemove->stopThread(); // 停止线程 // m_listCalculateDBPhaseThreads.remove(pThreadToRemove); // 从列表中移除 // delete pThreadToRemove; // 删除线程 // pThreadToRemove = nullptr; // m_referCountCalculateDBPhase = 0; // 重置引用计数 // SPDLOG_LOGGER_INFO(m_logger, "{} 音量报警线程已销毁", roadInfo.strRoadName); // } // return true; // } /* RTP线程函数,套一层壳 */ void ThreadManager::thread_RTPSend(RecordThreadInfo_t& threadInfo) { RTPOneRoadThread* pRtpSendThread = new RTPOneRoadThread(threadInfo); if(pRtpSendThread == nullptr) { SPDLOG_ERROR("{}:{} 创建RTP发送线程失败", threadInfo.cardRoadInfo.strSoundCardName.toStdString(), threadInfo.cardRoadInfo.roadInfo.nRoadNum); return; } /* 先加入队列,再开启线程 */ std::lock_guard lock(ThreadMan.m_mutexRtpSendThreads); ThreadMan.m_rtpSendThreads.push_back(pRtpSendThread); // pRtpSendThread->threadTask(); }