ThreadManager.cpp 15 KB


  1. #include "ThreadManager.h"
  2. #include "CreateWAVThread.h"
  3. #include "CreateDBThread.h"
  4. #include "CalculateDBThread.h"
  5. #include "ConsistencyCompareThread.h"
  6. #include "GlobalVariable.h"
  7. #include "NoiseDetectThread.h"
  8. #include "RtpOneRoadThread.h"
  9. #include "ThreadPool.h"
  10. #include <mutex>
  11. ThreadManager::ThreadManager()
  12. {
  13. m_logger = spdlog::get("ACAServer");
  14. if(m_logger == nullptr)
  15. {
  16. fmt::print("ThreadManager: ACAServer Logger not found.\n");
  17. return;
  18. }
  19. m_logBase = "ThreadManager";
  20. }
  21. /* 启动所有线程 */
  22. // void ThreadManager::startAllThreads()
  23. // {
  24. // }
  25. /* 停止所有线程 */
  26. void ThreadManager::stopAllThreads()
  27. {
  28. }
  29. /* 创建一个录音通道及其附属的线程 */
  30. bool ThreadManager::createRecordThread(const SoundCardRoadInfo_t& roadInfo)
  31. {
  32. /* 需要修改的地方
  33. 1、录音线程修改线程函数,添加task()函数,统一处理开启和关闭状态
  34. 2、原始录音线程,获取分派数据的函数指针不再自己获取,而是由分派线程自己设置过来指针
  35. 3、给存储录音线程的队列添加互斥锁,保护线程的创建和销毁
  36. */
  37. /* 先查找队列中有没有该录音通道 */
  38. return true;
  39. }
  40. /* 销毁一个录音通道及其附属的线程 */
  41. bool ThreadManager::destroyRecordThread(const SoundCardRoadInfo_t& roadInfo)
  42. {
  43. return true;
  44. }
  45. /* 查找录音线程 */
  46. BaseRecordThread* ThreadManager::findRecordThread(EThreadType type, int cardID, int recordID)
  47. {
  48. switch(type)
  49. {
  50. case EThreadType::Type_RecordSrc: /* 录音线程 */
  51. {
  52. std::lock_guard<std::mutex> lock(m_mutexRecordThreads);
  53. for (auto& pThread : m_recordThreads)
  54. {
  55. if (pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID &&
  56. pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID)
  57. {
  58. return pThread;
  59. }
  60. }
  61. }
  62. break;
  63. case EThreadType::Type_CreateWAV: /* 创建wav小文件和分离左右声道的线程 */
  64. {
  65. std::lock_guard<std::mutex> lock(m_mutexCreateWAVThreads);
  66. for (auto& pThread : m_createWAVThreads)
  67. {
  68. if (pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID &&
  69. pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID)
  70. {
  71. return pThread;
  72. }
  73. }
  74. }
  75. break;
  76. case EThreadType::Type_CalculateDBAndPhase: /* 计算音量和反相的线程 */
  77. {
  78. std::lock_guard<std::mutex> lock(m_mutexCreateDBThreads);
  79. for (auto& pThread : m_createDBThreads)
  80. {
  81. if (pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID &&
  82. pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID)
  83. {
  84. return pThread;
  85. }
  86. }
  87. }
  88. break;
  89. case EThreadType::Type_CreateLongWAV: /* 创建长文件的线程 */
  90. {
  91. std::lock_guard<std::mutex> lock(m_mutexCreateLongWAVThreads);
  92. for (auto& pThread : m_createLongWAVThreads)
  93. {
  94. if (pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID &&
  95. pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID)
  96. {
  97. return pThread;
  98. }
  99. }
  100. }
  101. break;
  102. case EThreadType::Type_AssignSrcData: /* 分派数据线程 */
  103. {
  104. std::lock_guard<std::mutex> lock(m_mutexAssignSrcDataThreads);
  105. for (auto& pThread : m_assignSrcDataThreads)
  106. {
  107. if (pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID &&
  108. pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID)
  109. {
  110. return pThread;
  111. }
  112. }
  113. }
  114. break;
  115. case EThreadType::Type_RtpSend: /* RTP发送线程 */
  116. {
  117. std::lock_guard<std::mutex> lock(m_mutexRtpSendThreads);
  118. for (auto& pThread : m_rtpSendThreads)
  119. {
  120. if (pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID &&
  121. pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID)
  122. {
  123. return pThread;
  124. }
  125. }
  126. }
  127. break;
  128. default:
  129. SPDLOG_LOGGER_ERROR(m_logger, "{} 查找录音线程失败,未知线程类型: {}", m_logBase, static_cast<int>(type));
  130. return nullptr; // 未知线程类型
  131. }
  132. return nullptr;
  133. }
  134. /* 获取创建WAV线程指针 */
  135. CreateWAVThread* ThreadManager::getCreateWAVThread(int cardID, int recordID)
  136. {
  137. std::lock_guard<std::mutex> lock(m_mutexCreateWAVThreads);
  138. for(auto& pThread : m_createWAVThreads)
  139. {
  140. if(pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID &&
  141. pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID)
  142. {
  143. return dynamic_cast<CreateWAVThread*>(pThread);
  144. }
  145. }
  146. return nullptr;
  147. }
  148. /* 获取创建音量值的线程 */
  149. CreateDBThread* ThreadManager::getCreateDBThread(int cardID, int recordID)
  150. {
  151. std::lock_guard<std::mutex> lock(m_mutexCreateDBThreads);
  152. for(auto& pThread : m_createDBThreads)
  153. {
  154. if(pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID &&
  155. pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID)
  156. {
  157. return dynamic_cast<CreateDBThread*>(pThread);
  158. }
  159. }
  160. return nullptr;
  161. }
  162. /* 获取发送Rtp数据的线程 */
  163. RTPOneRoadThread* ThreadManager::getRtpSendThread(int cardID, int recordID)
  164. {
  165. std::lock_guard<std::mutex> lock(m_mutexRtpSendThreads);
  166. for(auto& pThread : m_rtpSendThreads)
  167. {
  168. if(pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID &&
  169. pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID)
  170. {
  171. return dynamic_cast<RTPOneRoadThread*>(pThread);
  172. }
  173. }
  174. return nullptr;
  175. }
  176. /* -------------------------------------------------------------------------------------------
  177. * 获取计算线程,如果该线程不存在则创建该线程
  178. * 当不需要此线程后,调用remove()函数去掉该线程
  179. * -------------------------------------------------------------------------------------------- */
  180. /* 获取一致性比对线程,线程不存在则创建 */
  181. ConsistencyCompareThread* ThreadManager::getConsistencyCompareThread(const SoundCardRoadInfo_t& roadInfo1, const SoundCardRoadInfo_t& roadInfo2)
  182. {
  183. std::lock_guard<std::mutex> lock(m_mutexConsistencyCompareThreads);
  184. for(const auto pThread : m_listConsistencyCompareThreads)
  185. {
  186. if(pThread->isRoadEqual(roadInfo1, roadInfo2))
  187. {
  188. return pThread; // 找到相同的线程,直接返回
  189. }
  190. }
  191. /* 没找到该线程,创建新的线程 */
  192. CompareItemRoadInfo_t item1;
  193. item1.nCompareRoadNum = 1;
  194. item1.scRoadInfo = roadInfo1;
  195. CompareItemRoadInfo_t item2;
  196. item2.nCompareRoadNum = 2;
  197. item2.scRoadInfo = roadInfo2;
  198. CalculateThreadInfo_t threadInfo;
  199. threadInfo.compareItemInfo.mapRoad.insert(item1.nCompareRoadNum, item1);
  200. threadInfo.compareItemInfo.mapRoad.insert(item2.nCompareRoadNum, item2);
  201. ConsistencyCompareThread* newThread = new ConsistencyCompareThread(threadInfo);
  202. if(newThread == nullptr)
  203. {
  204. SPDLOG_LOGGER_ERROR(m_logger, "创建一致性比对线程失败");
  205. return nullptr; // 创建失败
  206. }
  207. CPPTP.add_task(&ConsistencyCompareThread::threadTask, newThread);
  208. m_listConsistencyCompareThreads.push_back(newThread);
  209. m_referCountConsistencyCompare++; // 引用计数加一
  210. return newThread;
  211. }
  212. /* 去掉线程,线程使用的计数减一,计数为0则销毁该线程 */
  213. bool ThreadManager::removeConsistencyCompareThread(SoundCardRoadInfo_t& roadInfo1, SoundCardRoadInfo_t& roadInfo2)
  214. {
  215. std::lock_guard<std::mutex> lock(m_mutexConsistencyCompareThreads);
  216. ConsistencyCompareThread* pThreadToRemove = nullptr;
  217. for(const auto pThread : m_listConsistencyCompareThreads)
  218. {
  219. if(pThread->isRoadEqual(roadInfo1, roadInfo2))
  220. {
  221. pThreadToRemove = pThread; // 找到相同的线程,直接返回
  222. break;
  223. }
  224. }
  225. if(pThreadToRemove == nullptr)
  226. {
  227. SPDLOG_LOGGER_WARN(m_logger, "{}:{} - {}:{} 一致性比对线程未找到", roadInfo1.strSoundCardName.toStdString(), roadInfo1.roadInfo.nRoadNum,
  228. roadInfo2.strSoundCardName.toStdString(), roadInfo2.roadInfo.nRoadNum);
  229. return false; // 没找到该线程
  230. }
  231. m_referCountConsistencyCompare--; // 引用计数减一
  232. if(m_referCountConsistencyCompare <= 0)
  233. {
  234. /* 停止线程,并一直等待其停止 */
  235. pThreadToRemove->stopThreadBlock();
  236. m_listConsistencyCompareThreads.remove(pThreadToRemove); // 从列表中移除
  237. delete pThreadToRemove; // 删除线程
  238. pThreadToRemove = nullptr;
  239. m_referCountConsistencyCompare = 0; // 重置引用计数
  240. SPDLOG_LOGGER_WARN(m_logger, "{}:{} - {}:{} 一致性比对线程已销毁", roadInfo1.strSoundCardName.toStdString(), roadInfo1.roadInfo.nRoadNum,
  241. roadInfo2.strSoundCardName.toStdString(), roadInfo2.roadInfo.nRoadNum);
  242. }
  243. return true;
  244. }
  245. /* 获取噪音检测线程 */
  246. NoiseDetectThread* ThreadManager::getNoiseDetectThread(const SoundCardRoadInfo_t& roadInfo)
  247. {
  248. std::lock_guard<std::mutex> lock(m_mutexNoiseDetectThreads);
  249. for(const auto pThread : m_listNoiseDetectThreads)
  250. {
  251. const SoundCardRoadInfo_t& threadRoadInfo = pThread->getRoadInfo();
  252. if(threadRoadInfo.nSoundCardNum == roadInfo.nSoundCardNum &&
  253. threadRoadInfo.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum)
  254. {
  255. return pThread; // 找到相同的线程,直接返回
  256. }
  257. }
  258. /* 没找到该线程,创建新的线程 */
  259. CalculateThreadInfo_t threadInfo;
  260. CompareItemRoadInfo_t item;
  261. item.nCompareRoadNum = 1; // 假设噪音检测线程
  262. item.scRoadInfo = roadInfo;
  263. threadInfo.compareItemInfo.mapRoad.insert(item.nCompareRoadNum, item);
  264. NoiseDetectThread* newThread = new NoiseDetectThread(threadInfo);
  265. if(newThread == nullptr)
  266. {
  267. SPDLOG_LOGGER_ERROR(m_logger, "创建噪音检测线程失败");
  268. return nullptr; // 创建失败
  269. }
  270. CPPTP.add_task(&NoiseDetectThread::threadTask, newThread);
  271. m_listNoiseDetectThreads.push_back(newThread);
  272. m_referCountNoiseDetect++; // 引用计数加一
  273. return newThread;
  274. }
  275. /* 去掉噪音检测线程,线程使用的计数减一,计数为0则销毁该线程 */
  276. bool ThreadManager::removeNoiseDetectThread(SoundCardRoadInfo_t& roadInfo)
  277. {
  278. std::lock_guard<std::mutex> lock(m_mutexNoiseDetectThreads);
  279. NoiseDetectThread* pThreadToRemove = nullptr;
  280. for(const auto pThread : m_listNoiseDetectThreads)
  281. {
  282. const SoundCardRoadInfo_t& threadRoadInfo = pThread->getRoadInfo();
  283. if(threadRoadInfo.nSoundCardNum == roadInfo.nSoundCardNum &&
  284. threadRoadInfo.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum)
  285. {
  286. pThreadToRemove = pThread; // 找到相同的线程,直接返回
  287. break;
  288. }
  289. }
  290. if(pThreadToRemove == nullptr)
  291. {
  292. SPDLOG_LOGGER_WARN(m_logger, "{}:{} 噪音检测线程未找到", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum);
  293. return false; // 没找到该线程
  294. }
  295. m_referCountNoiseDetect--; // 引用计数减一
  296. if(m_referCountNoiseDetect <= 0)
  297. {
  298. pThreadToRemove->stopThreadBlock(); // 停止线程
  299. m_listNoiseDetectThreads.remove(pThreadToRemove); // 从列表中移除
  300. delete pThreadToRemove; // 删除线程
  301. pThreadToRemove = nullptr;
  302. m_referCountNoiseDetect = 0; // 重置引用计数
  303. SPDLOG_LOGGER_INFO(m_logger, "{}:{} 噪音检测线程已销毁", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum);
  304. }
  305. return true;
  306. }
  307. // /* 获取音量报警线程 */
  308. // CalculateDBPhaseThread* ThreadManager::getCalculateDBPhaseThread(const SoundCardRoadInfo_t& roadInfo)
  309. // {
  310. // std::lock_guard<std::mutex> lock(m_mutexCalculateDBPhaseThreads);
  311. // for(const auto pThread : m_listCalculateDBPhaseThreads)
  312. // {
  313. // const SoundCardRoadInfo_t& threadRoadInfo = pThread->getRoadInfo();
  314. // if(threadRoadInfo.nSoundCardNum == roadInfo.nSoundCardNum &&
  315. // threadRoadInfo.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum)
  316. // {
  317. // return pThread; // 找到相同的线程,直接返回
  318. // }
  319. // }
  320. // /* 没找到该线程,创建新的线程 */
  321. // CompareItemRoadInfo_t item;
  322. // item.nCompareRoadNum = 1; // 假设音量报警线程
  323. // item.scRoadInfo = roadInfo;
  324. // threadInfo.compareItemInfo.mapRoad.insert(item.nCompareRoadNum, item);
  325. // CalculateDBPhaseThread* newThread = new CalculateDBPhaseThread(threadInfo);
  326. // if(newThread == nullptr)
  327. // {
  328. // SPDLOG_LOGGER_ERROR(m_logger, "创建音量报警线程失败");
  329. // return nullptr; // 创建失败
  330. // }
  331. // CPPTP.add_task(&CalculateDBPhaseThread::threadTask, newThread);
  332. // m_listCalculateDBPhaseThreads.push_back(newThread);
  333. // m_referCountCalculateDBPhase++; // 引用计数加一
  334. // return newThread;
  335. // }
  336. // /* 去掉音量报警线程,线程使用的计数减一,计数为0则销毁该线程 */
  337. // bool ThreadManager::removeCalculateDBPhaseThread(RoadNumberInfo_t& roadInfo)
  338. // {
  339. // std::lock_guard<std::mutex> lock(m_mutexCalculateDBPhaseThreads);
  340. // CalculateDBPhaseThread* pThreadToRemove = nullptr;
  341. // for(const auto pThread : m_listCalculateDBPhaseThreads)
  342. // {
  343. // if(pThread->getRoadInfo().roadID == roadInfo.roadID)
  344. // {
  345. // pThreadToRemove = pThread; // 找到相同的线程,直接返回
  346. // break;
  347. // }
  348. // }
  349. // if(pThreadToRemove == nullptr)
  350. // {
  351. // SPDLOG_LOGGER_WARN(m_logger, "{} 音量报警线程未找到", roadInfo.strRoadName);
  352. // return false; // 没找到该线程
  353. // }
  354. // m_referCountCalculateDBPhase--; // 引用计数减一
  355. // if(m_referCountCalculateDBPhase <= 0)
  356. // {
  357. // pThreadToRemove->stopThread(); // 停止线程
  358. // m_listCalculateDBPhaseThreads.remove(pThreadToRemove); // 从列表中移除
  359. // delete pThreadToRemove; // 删除线程
  360. // pThreadToRemove = nullptr;
  361. // m_referCountCalculateDBPhase = 0; // 重置引用计数
  362. // SPDLOG_LOGGER_INFO(m_logger, "{} 音量报警线程已销毁", roadInfo.strRoadName);
  363. // }
  364. // return true;
  365. // }