#include "ThreadManager.h" #include "CreateWAVThread.h" #include "CreateDBThread.h" #include "ConsistencyCompareThread.h" #include "GlobalVariable.h" #include "NoiseDetectThread.h" #include "RtpOneRoadThread.h" #include "CreateRecordFileThread.h" #include "RecordThread.h" #include "AssignSrcDataThread.h" #include "SoundCards.h" #include "ThreadPool.h" #include #include ThreadManager::ThreadManager() { m_logger = spdlog::get("ACAServer"); if(m_logger == nullptr) { fmt::print("ThreadManager: ACAServer Logger not found.\n"); return; } m_logBase = "ThreadManager"; } /* 启动所有线程 */ // void ThreadManager::startAllThreads() // { // } /* 停止所有线程 */ void ThreadManager::stopAllThreads() { } /* 线程函数,销毁录音线程 */ void ThreadManager::thread_destroyeRecordThread() { while(true) { // SPDLOG_LOGGER_WARN(m_logger, "销毁录音线程函数开始运行"); /* 设置超时2秒 */ std::unique_lock m_lock(m_mutexRecordThreadRefCount); m_condVarDestroyRecord.wait_for(m_lock, std::chrono::seconds(2), [this]() { return m_isDestroyeRecordThread.load(); }); // SPDLOG_LOGGER_WARN(m_logger, "销毁录音线程函数被唤醒, 销毁标志: {}", m_isDestroyeRecordThread.load()); if(m_isDestroyeRecordThread.load() == false) { continue; } /* 销毁标志位置为false */ m_isDestroyeRecordThread.store(false); /* 获取所有引用计数为0的录音线程 */ QList removePCMList; // 需要移除的录音通道列表 for(auto it = m_mapRecordThreadRefCount.begin(); it != m_mapRecordThreadRefCount.end(); it++) { if(it->second.empty()) { removePCMList.push_back(it->first); } } /* 停止线程 */ for(const auto& pcmName : removePCMList) { OneSoundCardPCMInfo_t pcmInfo; pcmInfo.pcmInfo.strPCMName = pcmName; stopRecordAllThreads(pcmInfo); } /* 销毁线程 */ destroyRecordThread(); } } /* 创建一个录音通道及其附属的线程 注意:这个函数在对比项线程中运行 */ bool ThreadManager::createRecordThread(const OneSoundCardPCMInfo_t& pcmInfo, int compareItemID) { /* 先查找队列中有没有该录音通道 */ std::unique_lock lock(m_mutexRecordThreadRefCount); for(const auto& pair : m_mapRecordThreadRefCount) { if( pair.first == pcmInfo.pcmInfo.strPCMName) { SPDLOG_LOGGER_TRACE(m_logger, "{}:{} 录音线程已存在", pcmInfo.strSoundCardName, pcmInfo.pcmInfo.strPCMName); /* 录音线程已存在,增加引用计数 先判断对比项ID是否已经在列表中了 */ bool idFound = false; for(const auto& it : pair.second) { if(it == compareItemID) { idFound = true; break; } } if(!idFound) { /* 如果对比项ID不存在,则添加到引用计数中 */ m_mapRecordThreadRefCount[pair.first].push_back(compareItemID); } SPDLOG_LOGGER_INFO(m_logger, "{}:{} 录音线程引用计数增加,当前计数: {}", pcmInfo.strSoundCardName, pcmInfo.pcmInfo.strPCMName, m_mapRecordThreadRefCount[pair.first].size()); /* 返回成功 */ return true; } } /* 录音线程不存在,挨个创建线程 */ /* 先创建生成wav小文件数据的线程 */ RecordThreadInfo_t threadInfo; threadInfo.cardRoadInfo = pcmInfo; threadInfo.threadState.store(EThreadState::State_Inited); threadInfo.threadType = EThreadType::Type_CreateWAV; CreateWAVThread* pCreateWAVThread = new CreateWAVThread(threadInfo); if(pCreateWAVThread == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "{}:{} 创建生成wav小文件线程失败", pcmInfo.strSoundCardName, pcmInfo.pcmInfo.strPCMName); // return false; // 创建失败 }else { CPPTP.add_task(&CreateWAVThread::thread_task, pCreateWAVThread); std::lock_guard lock(m_mutexCreateWAVThreads); m_createWAVThreads.push_back(pCreateWAVThread); } /* 创建计算音量的线程 */ threadInfo.threadType = EThreadType::Type_CreateDB; CreateDBThread* pCreateDBThread = new CreateDBThread(threadInfo); if(pCreateDBThread == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "{}:{} 创建计算音量线程失败", pcmInfo.strSoundCardName, pcmInfo.pcmInfo.strPCMName); // return false; // 创建失败 }else { CPPTP.add_task(&CreateDBThread::thread_task, pCreateDBThread); std::lock_guard lock(m_mutexCreateDBThreads); m_createDBThreads.push_back(pCreateDBThread); } /* 创建生成长文件的线程 */ threadInfo.threadType = EThreadType::Type_CreateLongWAV; CreateRecordFileThread* pCreateLongWAVThread = new CreateRecordFileThread(threadInfo); if(pCreateLongWAVThread == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "{}:{} 创建生成长文件线程失败", pcmInfo.strSoundCardName, pcmInfo.pcmInfo.strPCMName); // return false; // 创建失败 }else { CPPTP.add_task(&CreateRecordFileThread::thread_task, pCreateLongWAVThread); std::lock_guard lock(m_mutexCreateLongWAVThreads); m_createLongWAVThreads.push_back(pCreateLongWAVThread); } /* 创建发送RTP数据的线程 */ threadInfo.threadType = EThreadType::Type_RtpSend; CPPTP.add_task(&ThreadManager::thread_RTPSend, threadInfo); /* 创建分派数据线程 */ threadInfo.threadType = EThreadType::Type_AssignSrcData; AssignSrcDataThread* pAssignSrcDataThread = new AssignSrcDataThread(threadInfo); if(pAssignSrcDataThread == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "{}:{} 创建分派数据线程失败", pcmInfo.strSoundCardName, pcmInfo.pcmInfo.strPCMName); // return false; // 创建失败 }else { CPPTP.add_task(&AssignSrcDataThread::thread_task, pAssignSrcDataThread); std::lock_guard lock(m_mutexAssignSrcDataThreads); m_assignSrcDataThreads.push_back(pAssignSrcDataThread); } /* 创建录音线程 */ threadInfo.threadType = EThreadType::Type_RecordSrc; RecordThread* pRecordThread = new RecordThread(threadInfo); if(pRecordThread == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "{}:{} 创建录音线程失败", pcmInfo.strSoundCardName, pcmInfo.pcmInfo.strPCMName); // return false; // 创建失败 }else { CPPTP.add_task(&RecordThread::thread_task, pRecordThread); std::lock_guard lock(m_mutexRecordThreads); m_recordThreads.push_back(pRecordThread); } /* 录音线程创建成功,增加引用计数 */ m_mapRecordThreadRefCount[pcmInfo.pcmInfo.strPCMName].push_back(compareItemID); // SPDLOG_LOGGER_INFO(m_logger, "{}:{} 录音线程创建成功,map Size: {}, 当前引用计数: {}", // pcmInfo.strSoundCardName, pcmInfo.pcmInfo.strPCMName, m_mapRecordThreadRefCount.size(), m_mapRecordThreadRefCount[pcmInfo.pcmInfo.strPCMName].size()); return true; } /** 销毁一个录音通道及其附属的线程,如果引用计数为0, 这里只停止该录音通道的所有线程,不会删除实例 线程实例会由其他管理线程定期去删除 注意:这个函数其实是在对比项线程中运行的 */ bool ThreadManager::removeRecordThread(const OneSoundCardPCMInfo_t& pcmInfo, int compareItemID) { // SPDLOG_LOGGER_WARN(m_logger, "{}:{} 准备减少录音线程计数...", pcmInfo.strSoundCardName, pcmInfo.pcmInfo.strPCMName); std::unique_lock lock(m_mutexRecordThreadRefCount); // SPDLOG_LOGGER_WARN(m_logger, "{}:{} 准备销毁录音线程, map Size: {}", pcmInfo.strSoundCardName, pcmInfo.pcmInfo.strPCMName, m_mapRecordThreadRefCount.size()); /* 先查找这个引用计数是否存在 */ int refCount = -1; for(auto& pair : m_mapRecordThreadRefCount) { if(pair.first == pcmInfo.pcmInfo.strPCMName) { /* 找到该引用计数,减少引用计数 */ for(auto& it : pair.second) { if(it == compareItemID) { m_mapRecordThreadRefCount[pair.first].remove(it); break; } } refCount = m_mapRecordThreadRefCount[pair.first].size(); break; } } std::string logBase = fmt::format("{}:{}", pcmInfo.strSoundCardName, pcmInfo.pcmInfo.strPCMName); if(refCount > 0) { SPDLOG_LOGGER_INFO(m_logger, "{} 录音线程引用计数减少,当前计数: {}", logBase, refCount); return true; } else if(refCount < 0) { SPDLOG_LOGGER_WARN(m_logger, "{} 录音线程未找到,可能已经释放", logBase); return true; } SPDLOG_LOGGER_WARN(m_logger, "{} 录音线程引用计数为0,需要释放该录音通道的所有线程", logBase); /* 设置销毁录音线程标志 */ m_isDestroyeRecordThread.store(true); m_condVarDestroyRecord.notify_one(); return true; } /* 查找录音线程 */ BaseRecordThread* ThreadManager::findRecordThread(EThreadType type, std::string pcmName) { switch(type) { case EThreadType::Type_RecordSrc: /* 录音线程 */ { std::lock_guard lock(m_mutexRecordThreads); for (auto& pThread : m_recordThreads) { if (pThread->getThreadInfo().cardRoadInfo.pcmInfo.strPCMName == pcmName ) { return pThread; } } } break; case EThreadType::Type_CreateWAV: /* 创建wav小文件和分离左右声道的线程 */ { std::lock_guard lock(m_mutexCreateWAVThreads); for (auto& pThread : m_createWAVThreads) { if (pThread->getThreadInfo().cardRoadInfo.pcmInfo.strPCMName == pcmName ) { return pThread; } } } break; case EThreadType::Type_CreateDB: /* 计算音量和反相的线程 */ { std::lock_guard lock(m_mutexCreateDBThreads); for (auto& pThread : m_createDBThreads) { if (pThread->getThreadInfo().cardRoadInfo.pcmInfo.strPCMName == pcmName ) { return pThread; } } } break; case EThreadType::Type_CreateLongWAV: /* 创建长文件的线程 */ { std::lock_guard lock(m_mutexCreateLongWAVThreads); for (auto& pThread : m_createLongWAVThreads) { if (pThread->getThreadInfo().cardRoadInfo.pcmInfo.strPCMName == pcmName ) { return pThread; } } } break; case EThreadType::Type_AssignSrcData: /* 分派数据线程 */ { std::lock_guard lock(m_mutexAssignSrcDataThreads); for (auto& pThread : m_assignSrcDataThreads) { if (pThread->getThreadInfo().cardRoadInfo.pcmInfo.strPCMName == pcmName ) { return pThread; } } } break; case EThreadType::Type_RtpSend: /* RTP发送线程 */ { std::lock_guard lock(m_mutexRtpSendThreads); for (auto& pThread : m_rtpSendThreads) { if (pThread->getThreadInfo().cardRoadInfo.pcmInfo.strPCMName == pcmName ) { return pThread; } } } break; default: SPDLOG_LOGGER_ERROR(m_logger, "{} 查找录音线程失败,未知线程类型: {}", m_logBase, static_cast(type)); return nullptr; // 未知线程类型 } return nullptr; } /* 获取创建WAV线程指针 */ CreateWAVThread* ThreadManager::getCreateWAVThread(std::string pcmName) { std::lock_guard lock(m_mutexCreateWAVThreads); for(auto& pThread : m_createWAVThreads) { if(pThread->getThreadInfo().cardRoadInfo.pcmInfo.strPCMName == pcmName) { return dynamic_cast(pThread); } } return nullptr; } /* 获取创建音量值的线程 */ CreateDBThread* ThreadManager::getCreateDBThread(std::string pcmName) { std::lock_guard lock(m_mutexCreateDBThreads); for(auto& pThread : m_createDBThreads) { if(pThread->getThreadInfo().cardRoadInfo.pcmInfo.strPCMName == pcmName) { return dynamic_cast(pThread); } } return nullptr; } /* 获取发送Rtp数据的线程 */ RTPOneRoadThread* ThreadManager::getRtpSendThread(std::string pcmName) { std::lock_guard lock(m_mutexRtpSendThreads); for(auto& pThread : m_rtpSendThreads) { const auto& threadInfo = pThread->getThreadInfo(); if(threadInfo.cardRoadInfo.pcmInfo.strPCMName == pcmName) { return dynamic_cast(pThread); } } return nullptr; } /* 获取录制报警文件的线程 */ CreateRecordFileThread* ThreadManager::getCreateRecordFileThread(std::string pcmName) { std::lock_guard lock(m_mutexCreateLongWAVThreads); for(auto& pThread : m_createLongWAVThreads) { if(pThread->getThreadInfo().cardRoadInfo.pcmInfo.strPCMName == pcmName) { return dynamic_cast(pThread); } } return nullptr; } /* RTP线程函数,套一层壳 */ void ThreadManager::thread_RTPSend(RecordThreadInfo_t& threadInfo) { RTPOneRoadThread* pRtpSendThread = new RTPOneRoadThread(threadInfo); if(pRtpSendThread == nullptr) { SPDLOG_ERROR("{}:{} 创建RTP发送线程失败", threadInfo.cardRoadInfo.strSoundCardName, threadInfo.cardRoadInfo.pcmInfo.strPCMName); return; } /* 先加入队列,再开启线程 */ ThreadMan.m_mutexRtpSendThreads.lock(); ThreadMan.m_rtpSendThreads.push_back(pRtpSendThread); ThreadMan.m_mutexRtpSendThreads.unlock(); pRtpSendThread->thread_task(); } /* 停止某个录音通道的所有的线程 */ void ThreadManager::stopRecordAllThreads(const OneSoundCardPCMInfo_t& pcmInfo) { std::string logBase = fmt::format("{}:{}", pcmInfo.strSoundCardName, pcmInfo.pcmInfo.strPCMName); /* 引用计数为0,停止该录音通道的所有线程 */ /* 停止录音线程 */ { std::lock_guard lock(m_mutexRecordThreads); SPDLOG_LOGGER_WARN(m_logger, "{} 录音线程列表Size: {}", logBase, m_recordThreads.size()); for(auto it : m_recordThreads) { const auto& threadInfo = it->getThreadInfo(); if(threadInfo.cardRoadInfo.pcmInfo.strPCMName == pcmInfo.pcmInfo.strPCMName) { SPDLOG_LOGGER_TRACE(m_logger, "{} 准备停止录音线程: {}", logBase, threadInfo.cardRoadInfo.pcmInfo.strPCMName); it->thread_stop_block(); SPDLOG_LOGGER_TRACE(m_logger, "{} 录音线程已停止", logBase); break; } } } /* 停止分派任务线程 */ { std::lock_guard lock(m_mutexAssignSrcDataThreads); for(auto it : m_assignSrcDataThreads) { const auto& threadInfo = it->getThreadInfo(); if(threadInfo.cardRoadInfo.pcmInfo.strPCMName == pcmInfo.pcmInfo.strPCMName) { it->thread_stop_block(); SPDLOG_LOGGER_TRACE(m_logger, "{} 分派数据线程已停止", logBase); break; } } } /* 停止生成wav小文件线程 */ { std::lock_guard lock(m_mutexCreateWAVThreads); for(auto it : m_createWAVThreads) { const auto& threadInfo = it->getThreadInfo(); if(threadInfo.cardRoadInfo.pcmInfo.strPCMName == pcmInfo.pcmInfo.strPCMName) { it->thread_stop_block(); SPDLOG_LOGGER_TRACE(m_logger, "{} 生成wav小文件线程已停止", logBase); break; } } } /* 停止计算音量线程 */ { std::lock_guard lock(m_mutexCreateDBThreads); for(auto it : m_createDBThreads) { const auto& threadInfo = it->getThreadInfo(); if(threadInfo.cardRoadInfo.pcmInfo.strPCMName == pcmInfo.pcmInfo.strPCMName) { it->thread_stop_block(); SPDLOG_LOGGER_TRACE(m_logger, "{} 计算音量线程已停止", logBase); break; } } } /* 停止生成长文件线程 */ { std::lock_guard lock(m_mutexCreateLongWAVThreads); for(auto it : m_createLongWAVThreads) { const auto& threadInfo = it->getThreadInfo(); if(threadInfo.cardRoadInfo.pcmInfo.strPCMName == pcmInfo.pcmInfo.strPCMName) { it->thread_stop_block(); SPDLOG_LOGGER_TRACE(m_logger, "{} 生成长文件线程已停止", logBase); break; } } } /* 停止发送RTP数据线程 */ { std::lock_guard lock(m_mutexRtpSendThreads); for(auto it : m_rtpSendThreads) { const auto& threadInfo = it->getThreadInfo(); if(threadInfo.cardRoadInfo.pcmInfo.strPCMName == pcmInfo.pcmInfo.strPCMName) { it->thread_stop_block(); SPDLOG_LOGGER_TRACE(m_logger, "{} 发送RTP数据线程已停止", logBase); break; } } } /* 从引用计数中移除该录音通道 */ auto it = m_mapRecordThreadRefCount.find(pcmInfo.pcmInfo.strPCMName); if(it != m_mapRecordThreadRefCount.end()) { m_mapRecordThreadRefCount.erase(it); SPDLOG_LOGGER_WARN(m_logger, " ➢ 录音通道: {} 线程已全部停止运行", logBase); } else { SPDLOG_LOGGER_WARN(m_logger, "{} 录音线程引用计数未找到", logBase); } } /* 销毁录音线程 */ void ThreadManager::destroyRecordThread() { SPDLOG_LOGGER_DEBUG(m_logger, "{} 准备销毁录音线程...", m_logBase); /* 销毁录音线程 */ { std::lock_guard lock(m_mutexRecordThreads); for(auto it = m_recordThreads.begin(); it != m_recordThreads.end(); ) { BaseRecordThread* pThread = *it; if(pThread == nullptr) { /* 删除空指针元素 */ it = m_recordThreads.erase(it); continue; } auto threadState = pThread->getThreadInfo().threadState.load(); if(EThreadState::State_Stopped == threadState || EThreadState::State_Error == threadState ) { SPDLOG_LOGGER_DEBUG(m_logger, "{} 销毁录音线程: {}:{}", m_logBase, pThread->getThreadInfo().cardRoadInfo.strSoundCardName, pThread->getThreadInfo().cardRoadInfo.pcmInfo.strPCMName); auto pRecordThread = dynamic_cast(pThread); delete pRecordThread; // 删除线程 pThread = nullptr; it = m_recordThreads.erase(it); continue; } ++it; } } /* 销毁分派线程 */ { std::lock_guard lock(m_mutexAssignSrcDataThreads); for(auto it = m_assignSrcDataThreads.begin(); it != m_assignSrcDataThreads.end(); ) { BaseRecordThread* pThread = *it; if(pThread == nullptr) { it = m_assignSrcDataThreads.erase(it); continue; } auto threadState = pThread->getThreadInfo().threadState.load(); if(EThreadState::State_Stopped == threadState || EThreadState::State_Error == threadState ) { SPDLOG_LOGGER_DEBUG(m_logger, "{} 销毁分派数据线程: {}:{}", m_logBase, pThread->getThreadInfo().cardRoadInfo.strSoundCardName, pThread->getThreadInfo().cardRoadInfo.pcmInfo.strPCMName); auto pAssignSrcDataThread = dynamic_cast(pThread); delete pAssignSrcDataThread; // 删除线程 pThread = nullptr; it = m_assignSrcDataThreads.erase(it); // 从列表中移除 continue; } ++it; } } /* 销毁生成wav小文件线程 */ { std::lock_guard lock(m_mutexCreateWAVThreads); for(auto it = m_createWAVThreads.begin(); it != m_createWAVThreads.end(); ) { BaseRecordThread* pThread = *it; if(pThread == nullptr) { it = m_createWAVThreads.erase(it); continue; } auto threadState = pThread->getThreadInfo().threadState.load(); if(EThreadState::State_Stopped == threadState || EThreadState::State_Error == threadState ) { SPDLOG_LOGGER_DEBUG(m_logger, "{} 销毁 CreateWAV 线程: {}:{}", m_logBase, pThread->getThreadInfo().cardRoadInfo.strSoundCardName, pThread->getThreadInfo().cardRoadInfo.pcmInfo.strPCMName); auto pCreateWAVThread = dynamic_cast(pThread); delete pCreateWAVThread; // 删除线程 pThread = nullptr; it = m_createWAVThreads.erase(it); // 从列表中移除 continue; } ++it; } } /* 销毁计算音量线程 */ { std::lock_guard lock(m_mutexCreateDBThreads); for(auto it = m_createDBThreads.begin(); it != m_createDBThreads.end(); ) { BaseRecordThread* pThread = *it; if(pThread == nullptr) { it = m_createDBThreads.erase(it); continue; } auto threadState = pThread->getThreadInfo().threadState.load(); if(EThreadState::State_Stopped == threadState || EThreadState::State_Error == threadState ) { SPDLOG_LOGGER_DEBUG(m_logger, "{} 销毁 CreateDB 线程: {}:{}", m_logBase, pThread->getThreadInfo().cardRoadInfo.strSoundCardName, pThread->getThreadInfo().cardRoadInfo.pcmInfo.strPCMName); auto pCreateDBThread = dynamic_cast(pThread); delete pCreateDBThread; // 删除线程 pThread = nullptr; it = m_createDBThreads.erase(it); // 从列表中移除 continue; } ++it; } } /* 销毁生成长文件线程 */ { std::lock_guard lock(m_mutexCreateLongWAVThreads); for(auto it = m_createLongWAVThreads.begin(); it != m_createLongWAVThreads.end(); ) { BaseRecordThread* pThread = *it; if(pThread == nullptr) { it = m_createLongWAVThreads.erase(it); continue; } auto threadState = pThread->getThreadInfo().threadState.load(); if(EThreadState::State_Stopped == threadState || EThreadState::State_Error == threadState ) { SPDLOG_LOGGER_DEBUG(m_logger, "{} 销毁 CreateRecordFileThread 线程: {}:{}", m_logBase, pThread->getThreadInfo().cardRoadInfo.strSoundCardName, pThread->getThreadInfo().cardRoadInfo.pcmInfo.strPCMName); auto pCreateLongWAVThread = dynamic_cast(pThread); delete pCreateLongWAVThread; // 删除线程 pThread = nullptr; it = m_createLongWAVThreads.erase(it); // 从列表中移除 continue; } ++it; } } /* 销毁发送RTP数据线程 */ { std::lock_guard lock(m_mutexRtpSendThreads); for(auto it = m_rtpSendThreads.begin(); it != m_rtpSendThreads.end(); ) { BaseRecordThread* pThread = *it; if(pThread == nullptr) { it = m_rtpSendThreads.erase(it); continue; } auto threadState = pThread->getThreadInfo().threadState.load(); if(EThreadState::State_Stopped == threadState || EThreadState::State_Error == threadState ) { SPDLOG_LOGGER_DEBUG(m_logger, "{} 销毁 RTP Send 线程: {}:{}", m_logBase, pThread->getThreadInfo().cardRoadInfo.strSoundCardName, pThread->getThreadInfo().cardRoadInfo.pcmInfo.strPCMName); auto pRtpSendThread = dynamic_cast(pThread); delete pRtpSendThread; // 删除线程 pThread = nullptr; it = m_rtpSendThreads.erase(it); // 从列表中移除 } ++it; } } SPDLOG_LOGGER_DEBUG(m_logger, "{} 录音线程销毁完成", m_logBase); }