ThreadManager.cpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. #include "ThreadManager.h"
  2. #include "CreateWAVThread.h"
  3. #include "CreateDBPhaseThread.h"
  4. #include "CalculateDBThread.h"
  5. #include "ConsistencyCompareThread.h"
  6. #include "GlobalVariable.h"
  7. #include "NoiseDetectThread.h"
  8. #include "RtpOneRoadThread.h"
  9. #include "ThreadPool.h"
  10. ThreadManager::ThreadManager()
  11. {
  12. m_logger = spdlog::get("ACAServer");
  13. if(m_logger == nullptr)
  14. {
  15. fmt::print("ThreadManager: ACAServer Logger not found.\n");
  16. return;
  17. }
  18. m_logBase = "ThreadManager";
  19. }
  20. /* 启动所有线程 */
  21. // void ThreadManager::startAllThreads()
  22. // {
  23. // }
  24. /* 停止所有线程 */
  25. void ThreadManager::stopAllThreads()
  26. {
  27. }
  28. // /* 添加线程 */
  29. // void ThreadManager::addThread(StRecordThreadInfo* pThreadInfo)
  30. // {
  31. // }
  32. /* 查找录音线程 */
  33. BaseRecordThread* ThreadManager::findRecordThread(EThreadType type, int cardID, int recordID)
  34. {
  35. switch(type)
  36. {
  37. case EThreadType::Type_RecordSrc: /* 录音线程 */
  38. for (auto& pThread : m_recordThreads)
  39. {
  40. if (pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID &&
  41. pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID)
  42. {
  43. return pThread;
  44. }
  45. }
  46. break;
  47. case EThreadType::Type_CreateWAV: /* 创建wav小文件和分离左右声道的线程 */
  48. for (auto& pThread : m_createWAVThreads)
  49. {
  50. if (pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID &&
  51. pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID)
  52. {
  53. return pThread;
  54. }
  55. }
  56. break;
  57. case EThreadType::Type_CalculateDBAndPhase: /* 计算音量和反相的线程 */
  58. for (auto& pThread : m_createDBPhaseThreads)
  59. {
  60. if (pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID &&
  61. pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID)
  62. {
  63. return pThread;
  64. }
  65. }
  66. break;
  67. case EThreadType::Type_CreateLongWAV: /* 创建长文件的线程 */
  68. for (auto& pThread : m_createLongWAVThreads)
  69. {
  70. if (pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID &&
  71. pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID)
  72. {
  73. return pThread;
  74. }
  75. }
  76. break;
  77. case EThreadType::Type_AssignSrcData: /* 分派数据线程 */
  78. for (auto& pThread : m_assignSrcDataThreads)
  79. {
  80. if (pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID &&
  81. pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID)
  82. {
  83. return pThread;
  84. }
  85. }
  86. case EThreadType::Type_RtpSend: /* RTP发送线程 */
  87. for (auto& pThread : m_rtpSendThreads)
  88. {
  89. if (pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID &&
  90. pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID)
  91. {
  92. return pThread;
  93. }
  94. }
  95. break;
  96. default:
  97. SPDLOG_LOGGER_ERROR(m_logger, "{} 查找录音线程失败,未知线程类型: {}", m_logBase, static_cast<int>(type));
  98. return nullptr; // 未知线程类型
  99. }
  100. return nullptr;
  101. }
  102. /* 获取创建WAV线程指针 */
  103. CreateWAVThread* ThreadManager::getCreateWAVThread(int cardID, int recordID)
  104. {
  105. for(auto& pThread : m_createWAVThreads)
  106. {
  107. if(pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID &&
  108. pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID)
  109. {
  110. return dynamic_cast<CreateWAVThread*>(pThread);
  111. }
  112. }
  113. return nullptr;
  114. }
  115. /* 获取创建音量值的线程 */
  116. CreateDBPhaseThread* ThreadManager::getCreateDBPhaseThread(int cardID, int recordID)
  117. {
  118. for(auto& pThread : m_createDBPhaseThreads)
  119. {
  120. if(pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID &&
  121. pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID)
  122. {
  123. return dynamic_cast<CreateDBPhaseThread*>(pThread);
  124. }
  125. }
  126. return nullptr;
  127. }
  128. /* 获取发送Rtp数据的线程 */
  129. RTPOneRoadThread* ThreadManager::getRtpSendThread(int cardID, int recordID)
  130. {
  131. for(auto& pThread : m_rtpSendThreads)
  132. {
  133. if(pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID &&
  134. pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID)
  135. {
  136. return dynamic_cast<RTPOneRoadThread*>(pThread);
  137. }
  138. }
  139. return nullptr;
  140. }
  141. /* -------------------------------------------------------------------------------------------
  142. * 获取计算线程,如果该线程不存在则创建该线程
  143. * 当不需要此线程后,调用remove()函数去掉该线程
  144. * -------------------------------------------------------------------------------------------- */
  145. /* 获取一致性比对线程,线程不存在则创建 */
  146. ConsistencyCompareThread* ThreadManager::getConsistencyCompareThread(const SoundCardRoadInfo_t& roadInfo1, const SoundCardRoadInfo_t& roadInfo2)
  147. {
  148. std::lock_guard<std::mutex> lock(m_mutexConsistencyCompareThreads);
  149. for(const auto pThread : m_listConsistencyCompareThreads)
  150. {
  151. if(pThread->isRoadEqual(roadInfo1, roadInfo2))
  152. {
  153. return pThread; // 找到相同的线程,直接返回
  154. }
  155. }
  156. /* 没找到该线程,创建新的线程 */
  157. CompareItemRoadInfo_t item1;
  158. item1.nCompareRoadNum = 1;
  159. item1.scRoadInfo = roadInfo1;
  160. CompareItemRoadInfo_t item2;
  161. item2.nCompareRoadNum = 2;
  162. item2.scRoadInfo = roadInfo2;
  163. CalculateThreadInfo_t threadInfo;
  164. threadInfo.compareItemInfo.mapRoad.insert(item1.nCompareRoadNum, item1);
  165. threadInfo.compareItemInfo.mapRoad.insert(item2.nCompareRoadNum, item2);
  166. ConsistencyCompareThread* newThread = new ConsistencyCompareThread(threadInfo);
  167. if(newThread == nullptr)
  168. {
  169. SPDLOG_LOGGER_ERROR(m_logger, "创建一致性比对线程失败");
  170. return nullptr; // 创建失败
  171. }
  172. CPPTP.add_task(&ConsistencyCompareThread::threadTask, newThread);
  173. m_listConsistencyCompareThreads.push_back(newThread);
  174. m_referCountConsistencyCompare++; // 引用计数加一
  175. return newThread;
  176. }
  177. /* 去掉线程,线程使用的计数减一,计数为0则销毁该线程 */
  178. bool ThreadManager::removeConsistencyCompareThread(SoundCardRoadInfo_t& roadInfo1, SoundCardRoadInfo_t& roadInfo2)
  179. {
  180. std::lock_guard<std::mutex> lock(m_mutexConsistencyCompareThreads);
  181. ConsistencyCompareThread* pThreadToRemove = nullptr;
  182. for(const auto pThread : m_listConsistencyCompareThreads)
  183. {
  184. if(pThread->isRoadEqual(roadInfo1, roadInfo2))
  185. {
  186. pThreadToRemove = pThread; // 找到相同的线程,直接返回
  187. break;
  188. }
  189. }
  190. if(pThreadToRemove == nullptr)
  191. {
  192. SPDLOG_LOGGER_WARN(m_logger, "{}:{} - {}:{} 一致性比对线程未找到", roadInfo1.strSoundCardName.toStdString(), roadInfo1.roadInfo.nRoadNum,
  193. roadInfo2.strSoundCardName.toStdString(), roadInfo2.roadInfo.nRoadNum);
  194. return false; // 没找到该线程
  195. }
  196. m_referCountConsistencyCompare--; // 引用计数减一
  197. if(m_referCountConsistencyCompare <= 0)
  198. {
  199. /* 停止线程,并一直等待其停止 */
  200. pThreadToRemove->stopThreadBlock();
  201. m_listConsistencyCompareThreads.remove(pThreadToRemove); // 从列表中移除
  202. delete pThreadToRemove; // 删除线程
  203. pThreadToRemove = nullptr;
  204. m_referCountConsistencyCompare = 0; // 重置引用计数
  205. SPDLOG_LOGGER_WARN(m_logger, "{}:{} - {}:{} 一致性比对线程已销毁", roadInfo1.strSoundCardName.toStdString(), roadInfo1.roadInfo.nRoadNum,
  206. roadInfo2.strSoundCardName.toStdString(), roadInfo2.roadInfo.nRoadNum);
  207. }
  208. return true;
  209. }
  210. /* 获取噪音检测线程 */
  211. NoiseDetectThread* ThreadManager::getNoiseDetectThread(const SoundCardRoadInfo_t& roadInfo)
  212. {
  213. std::lock_guard<std::mutex> lock(m_mutexNoiseDetectThreads);
  214. for(const auto pThread : m_listNoiseDetectThreads)
  215. {
  216. const SoundCardRoadInfo_t& threadRoadInfo = pThread->getRoadInfo();
  217. if(threadRoadInfo.nSoundCardNum == roadInfo.nSoundCardNum &&
  218. threadRoadInfo.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum)
  219. {
  220. return pThread; // 找到相同的线程,直接返回
  221. }
  222. }
  223. /* 没找到该线程,创建新的线程 */
  224. CalculateThreadInfo_t threadInfo;
  225. CompareItemRoadInfo_t item;
  226. item.nCompareRoadNum = 1; // 假设噪音检测线程
  227. item.scRoadInfo = roadInfo;
  228. threadInfo.compareItemInfo.mapRoad.insert(item.nCompareRoadNum, item);
  229. NoiseDetectThread* newThread = new NoiseDetectThread(threadInfo);
  230. if(newThread == nullptr)
  231. {
  232. SPDLOG_LOGGER_ERROR(m_logger, "创建噪音检测线程失败");
  233. return nullptr; // 创建失败
  234. }
  235. CPPTP.add_task(&NoiseDetectThread::threadTask, newThread);
  236. m_listNoiseDetectThreads.push_back(newThread);
  237. m_referCountNoiseDetect++; // 引用计数加一
  238. return newThread;
  239. }
  240. /* 去掉噪音检测线程,线程使用的计数减一,计数为0则销毁该线程 */
  241. bool ThreadManager::removeNoiseDetectThread(SoundCardRoadInfo_t& roadInfo)
  242. {
  243. std::lock_guard<std::mutex> lock(m_mutexNoiseDetectThreads);
  244. NoiseDetectThread* pThreadToRemove = nullptr;
  245. for(const auto pThread : m_listNoiseDetectThreads)
  246. {
  247. const SoundCardRoadInfo_t& threadRoadInfo = pThread->getRoadInfo();
  248. if(threadRoadInfo.nSoundCardNum == roadInfo.nSoundCardNum &&
  249. threadRoadInfo.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum)
  250. {
  251. pThreadToRemove = pThread; // 找到相同的线程,直接返回
  252. break;
  253. }
  254. }
  255. if(pThreadToRemove == nullptr)
  256. {
  257. SPDLOG_LOGGER_WARN(m_logger, "{}:{} 噪音检测线程未找到", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum);
  258. return false; // 没找到该线程
  259. }
  260. m_referCountNoiseDetect--; // 引用计数减一
  261. if(m_referCountNoiseDetect <= 0)
  262. {
  263. pThreadToRemove->stopThreadBlock(); // 停止线程
  264. m_listNoiseDetectThreads.remove(pThreadToRemove); // 从列表中移除
  265. delete pThreadToRemove; // 删除线程
  266. pThreadToRemove = nullptr;
  267. m_referCountNoiseDetect = 0; // 重置引用计数
  268. SPDLOG_LOGGER_INFO(m_logger, "{}:{} 噪音检测线程已销毁", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum);
  269. }
  270. return true;
  271. }
  272. // /* 获取音量报警线程 */
  273. // CalculateDBPhaseThread* ThreadManager::getCalculateDBPhaseThread(const SoundCardRoadInfo_t& roadInfo)
  274. // {
  275. // std::lock_guard<std::mutex> lock(m_mutexCalculateDBPhaseThreads);
  276. // for(const auto pThread : m_listCalculateDBPhaseThreads)
  277. // {
  278. // const SoundCardRoadInfo_t& threadRoadInfo = pThread->getRoadInfo();
  279. // if(threadRoadInfo.nSoundCardNum == roadInfo.nSoundCardNum &&
  280. // threadRoadInfo.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum)
  281. // {
  282. // return pThread; // 找到相同的线程,直接返回
  283. // }
  284. // }
  285. // /* 没找到该线程,创建新的线程 */
  286. // CompareItemRoadInfo_t item;
  287. // item.nCompareRoadNum = 1; // 假设音量报警线程
  288. // item.scRoadInfo = roadInfo;
  289. // threadInfo.compareItemInfo.mapRoad.insert(item.nCompareRoadNum, item);
  290. // CalculateDBPhaseThread* newThread = new CalculateDBPhaseThread(threadInfo);
  291. // if(newThread == nullptr)
  292. // {
  293. // SPDLOG_LOGGER_ERROR(m_logger, "创建音量报警线程失败");
  294. // return nullptr; // 创建失败
  295. // }
  296. // CPPTP.add_task(&CalculateDBPhaseThread::threadTask, newThread);
  297. // m_listCalculateDBPhaseThreads.push_back(newThread);
  298. // m_referCountCalculateDBPhase++; // 引用计数加一
  299. // return newThread;
  300. // }
  301. // /* 去掉音量报警线程,线程使用的计数减一,计数为0则销毁该线程 */
  302. // bool ThreadManager::removeCalculateDBPhaseThread(RoadNumberInfo_t& roadInfo)
  303. // {
  304. // std::lock_guard<std::mutex> lock(m_mutexCalculateDBPhaseThreads);
  305. // CalculateDBPhaseThread* pThreadToRemove = nullptr;
  306. // for(const auto pThread : m_listCalculateDBPhaseThreads)
  307. // {
  308. // if(pThread->getRoadInfo().roadID == roadInfo.roadID)
  309. // {
  310. // pThreadToRemove = pThread; // 找到相同的线程,直接返回
  311. // break;
  312. // }
  313. // }
  314. // if(pThreadToRemove == nullptr)
  315. // {
  316. // SPDLOG_LOGGER_WARN(m_logger, "{} 音量报警线程未找到", roadInfo.strRoadName);
  317. // return false; // 没找到该线程
  318. // }
  319. // m_referCountCalculateDBPhase--; // 引用计数减一
  320. // if(m_referCountCalculateDBPhase <= 0)
  321. // {
  322. // pThreadToRemove->stopThread(); // 停止线程
  323. // m_listCalculateDBPhaseThreads.remove(pThreadToRemove); // 从列表中移除
  324. // delete pThreadToRemove; // 删除线程
  325. // pThreadToRemove = nullptr;
  326. // m_referCountCalculateDBPhase = 0; // 重置引用计数
  327. // SPDLOG_LOGGER_INFO(m_logger, "{} 音量报警线程已销毁", roadInfo.strRoadName);
  328. // }
  329. // return true;
  330. // }