ThreadManager.cpp 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913
  1. #include "ThreadManager.h"
  2. #include "CreateWAVThread.h"
  3. #include "CreateDBThread.h"
  4. #include "ConsistencyCompareThread.h"
  5. #include "GlobalVariable.h"
  6. #include "NoiseDetectThread.h"
  7. #include "RtpOneRoadThread.h"
  8. #include "CreateLongFileThread.h"
  9. #include "RecordThread.h"
  10. #include "AssignSrcDataThread.h"
  11. #include "ThreadPool.h"
  12. #include <mutex>
  13. ThreadManager::ThreadManager()
  14. {
  15. m_logger = spdlog::get("ACAServer");
  16. if(m_logger == nullptr)
  17. {
  18. fmt::print("ThreadManager: ACAServer Logger not found.\n");
  19. return;
  20. }
  21. m_logBase = "ThreadManager";
  22. }
  23. /* 启动所有线程 */
  24. // void ThreadManager::startAllThreads()
  25. // {
  26. // }
  27. /* 停止所有线程 */
  28. void ThreadManager::stopAllThreads()
  29. {
  30. }
  31. /* 创建一个录音通道及其附属的线程 */
  32. bool ThreadManager::createRecordThread(const SoundCardRoadInfo_t& roadInfo, int compareItemID)
  33. {
  34. /* 先查找队列中有没有该录音通道 */
  35. std::lock_guard<std::mutex> lock(m_mutexRecordThreadRefCount);
  36. for(const auto& pair : m_mapRecordThreadRefCount)
  37. {
  38. if( pair.first.nSoundCardNum == roadInfo.nSoundCardNum &&
  39. pair.first.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum)
  40. {
  41. SPDLOG_LOGGER_TRACE(m_logger, "{}:{} 录音线程已存在", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum);
  42. /* 录音线程已存在,增加引用计数
  43. 先判断对比项ID是否已经在列表中了 */
  44. bool idFound = false;
  45. for(const auto& it : pair.second)
  46. {
  47. if(it == compareItemID)
  48. {
  49. idFound = true;
  50. break;
  51. }
  52. }
  53. if(!idFound)
  54. {
  55. /* 如果对比项ID不存在,则添加到引用计数中 */
  56. m_mapRecordThreadRefCount[pair.first].push_back(compareItemID);
  57. }
  58. SPDLOG_LOGGER_INFO(m_logger, "{}:{} 录音线程引用计数增加,当前计数: {}",
  59. roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum, m_mapRecordThreadRefCount[pair.first].size());
  60. /* 返回成功 */
  61. return true;
  62. }
  63. }
  64. /* 录音线程不存在,挨个创建线程 */
  65. /* 先创建生成wav小文件数据的线程 */
  66. RecordThreadInfo_t threadInfo;
  67. threadInfo.cardRoadInfo = roadInfo;
  68. threadInfo.threadState = EThreadState::State_Inited;
  69. threadInfo.threadType = EThreadType::Type_CreateWAV;
  70. CreateWAVThread* pCreateWAVThread = new CreateWAVThread(threadInfo);
  71. if(pCreateWAVThread == nullptr)
  72. {
  73. SPDLOG_LOGGER_ERROR(m_logger, "{}:{} 创建生成wav小文件线程失败", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum);
  74. // return false; // 创建失败
  75. }else
  76. {
  77. CPPTP.add_task(&CreateWAVThread::threadTask, pCreateWAVThread);
  78. std::lock_guard<std::mutex> lock(m_mutexCreateWAVThreads);
  79. m_createWAVThreads.push_back(pCreateWAVThread);
  80. }
  81. /* 创建计算音量的线程 */
  82. threadInfo.threadType = EThreadType::Type_CreateDB;
  83. CreateDBThread* pCreateDBThread = new CreateDBThread(threadInfo);
  84. if(pCreateDBThread == nullptr)
  85. {
  86. SPDLOG_LOGGER_ERROR(m_logger, "{}:{} 创建计算音量线程失败", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum);
  87. // return false; // 创建失败
  88. }else
  89. {
  90. CPPTP.add_task(&CreateDBThread::threadTask, pCreateDBThread);
  91. std::lock_guard<std::mutex> lock(m_mutexCreateDBThreads);
  92. m_createDBThreads.push_back(pCreateDBThread);
  93. }
  94. /* 创建生成长文件的线程 */
  95. threadInfo.threadType = EThreadType::Type_CreateLongWAV;
  96. CreateLongFileThread* pCreateLongWAVThread = new CreateLongFileThread(threadInfo);
  97. if(pCreateLongWAVThread == nullptr)
  98. {
  99. SPDLOG_LOGGER_ERROR(m_logger, "{}:{} 创建生成长文件线程失败", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum);
  100. // return false; // 创建失败
  101. }else
  102. {
  103. CPPTP.add_task(&CreateLongFileThread::threadTask, pCreateLongWAVThread);
  104. std::lock_guard<std::mutex> lock(m_mutexCreateLongWAVThreads);
  105. m_createLongWAVThreads.push_back(pCreateLongWAVThread);
  106. }
  107. /* 创建发送RTP数据的线程 */
  108. threadInfo.threadType = EThreadType::Type_RtpSend;
  109. CPPTP.add_task(&ThreadManager::thread_RTPSend, threadInfo);
  110. RTPOneRoadThread* pRtpSendThread = new RTPOneRoadThread(threadInfo);
  111. if(pRtpSendThread == nullptr)
  112. {
  113. SPDLOG_LOGGER_ERROR(m_logger, "{}:{} 创建发送RTP数据线程失败", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum);
  114. }else
  115. {
  116. CPPTP.add_task(&RTPOneRoadThread::threadTask, pRtpSendThread);
  117. std::lock_guard<std::mutex> lock(m_mutexRtpSendThreads);
  118. m_rtpSendThreads.push_back(pRtpSendThread);
  119. }
  120. /* 创建录音线程 */
  121. threadInfo.threadType = EThreadType::Type_RecordSrc;
  122. RecordThread* pRecordThread = new RecordThread(threadInfo);
  123. if(pRecordThread == nullptr)
  124. {
  125. SPDLOG_LOGGER_ERROR(m_logger, "{}:{} 创建录音线程失败", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum);
  126. // return false; // 创建失败
  127. }else
  128. {
  129. CPPTP.add_task(&RecordThread::threadTask, pRecordThread);
  130. std::lock_guard<std::mutex> lock(m_mutexRecordThreads);
  131. m_recordThreads.push_back(pRecordThread);
  132. }
  133. /* 创建分派数据线程 */
  134. threadInfo.threadType = EThreadType::Type_AssignSrcData;
  135. AssignSrcDataThread* pAssignSrcDataThread = new AssignSrcDataThread(threadInfo);
  136. if(pAssignSrcDataThread == nullptr)
  137. {
  138. SPDLOG_LOGGER_ERROR(m_logger, "{}:{} 创建分派数据线程失败", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum);
  139. // return false; // 创建失败
  140. }else
  141. {
  142. CPPTP.add_task(&AssignSrcDataThread::threadTask, pAssignSrcDataThread);
  143. std::lock_guard<std::mutex> lock(m_mutexAssignSrcDataThreads);
  144. m_assignSrcDataThreads.push_back(pAssignSrcDataThread);
  145. }
  146. /* 录音线程创建成功,增加引用计数 */
  147. m_mapRecordThreadRefCount[roadInfo].push_back(compareItemID);
  148. return true;
  149. }
  150. /**
  151. 销毁一个录音通道及其附属的线程,如果引用计数为0, 这里只停止该录音通道的所有线程,不会删除实例
  152. 线程实例会由其他管理线程定期去删除
  153. */
  154. bool ThreadManager::removeRecordThread(const SoundCardRoadInfo_t& roadInfo, int compareItemID)
  155. {
  156. std::lock_guard<std::mutex> lock(m_mutexRecordThreadRefCount);
  157. /* 先查找这个引用计数是否存在 */
  158. int refCount = 0;
  159. for(auto& pair : m_mapRecordThreadRefCount)
  160. {
  161. if(pair.first.nSoundCardNum == roadInfo.nSoundCardNum &&
  162. pair.first.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum)
  163. {
  164. /* 找到该引用计数,减少引用计数 */
  165. for(auto& it : pair.second)
  166. {
  167. if(it == compareItemID)
  168. {
  169. m_mapRecordThreadRefCount[pair.first].remove(it);
  170. break;
  171. }
  172. }
  173. refCount = m_mapRecordThreadRefCount[pair.first].size();
  174. break;
  175. }
  176. }
  177. if(refCount > 0)
  178. {
  179. SPDLOG_LOGGER_INFO(m_logger, "{}:{} 录音线程引用计数减少,当前计数: {}",
  180. roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum, refCount);
  181. return true;
  182. }
  183. SPDLOG_LOGGER_INFO(m_logger, "{}:{} 录音线程引用计数为0,即将停止该录音通道的所有线程",
  184. roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum);
  185. /* 引用计数为0,停止该录音通道的所有线程 */
  186. /* 停止录音线程 */
  187. {
  188. std::lock_guard<std::mutex> lock(m_mutexRecordThreads);
  189. for(auto it : m_recordThreads)
  190. {
  191. const auto& threadInfo = it->getThreadInfo();
  192. if(threadInfo.cardRoadInfo.nSoundCardNum == roadInfo.nSoundCardNum &&
  193. threadInfo.cardRoadInfo.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum)
  194. {
  195. it->stopThread();
  196. SPDLOG_LOGGER_TRACE(m_logger, "{}:{} 录音线程已停止", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum);
  197. break;
  198. }
  199. }
  200. }
  201. /* 停止分派任务线程 */
  202. {
  203. std::lock_guard<std::mutex> lock(m_mutexAssignSrcDataThreads);
  204. for(auto it : m_assignSrcDataThreads)
  205. {
  206. const auto& threadInfo = it->getThreadInfo();
  207. if(threadInfo.cardRoadInfo.nSoundCardNum == roadInfo.nSoundCardNum &&
  208. threadInfo.cardRoadInfo.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum)
  209. {
  210. it->stopThread();
  211. SPDLOG_LOGGER_TRACE(m_logger, "{}:{} 分派数据线程已停止", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum);
  212. break;
  213. }
  214. }
  215. }
  216. /* 停止生成wav小文件线程 */
  217. {
  218. std::lock_guard<std::mutex> lock(m_mutexCreateWAVThreads);
  219. for(auto it : m_createWAVThreads)
  220. {
  221. const auto& threadInfo = it->getThreadInfo();
  222. if(threadInfo.cardRoadInfo.nSoundCardNum == roadInfo.nSoundCardNum &&
  223. threadInfo.cardRoadInfo.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum)
  224. {
  225. it->stopThread();
  226. SPDLOG_LOGGER_TRACE(m_logger, "{}:{} 生成wav小文件线程已停止", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum);
  227. break;
  228. }
  229. }
  230. }
  231. /* 停止计算音量线程 */
  232. {
  233. std::lock_guard<std::mutex> lock(m_mutexCreateDBThreads);
  234. for(auto it : m_createDBThreads)
  235. {
  236. const auto& threadInfo = it->getThreadInfo();
  237. if(threadInfo.cardRoadInfo.nSoundCardNum == roadInfo.nSoundCardNum &&
  238. threadInfo.cardRoadInfo.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum)
  239. {
  240. it->stopThread();
  241. SPDLOG_LOGGER_TRACE(m_logger, "{}:{} 计算音量线程已停止", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum);
  242. break;
  243. }
  244. }
  245. }
  246. /* 停止生成长文件线程 */
  247. {
  248. std::lock_guard<std::mutex> lock(m_mutexCreateLongWAVThreads);
  249. for(auto it : m_createLongWAVThreads)
  250. {
  251. const auto& threadInfo = it->getThreadInfo();
  252. if(threadInfo.cardRoadInfo.nSoundCardNum == roadInfo.nSoundCardNum &&
  253. threadInfo.cardRoadInfo.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum)
  254. {
  255. it->stopThread();
  256. SPDLOG_LOGGER_TRACE(m_logger, "{}:{} 生成长文件线程已停止", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum);
  257. break;
  258. }
  259. }
  260. }
  261. /* 停止发送RTP数据线程 */
  262. {
  263. std::lock_guard<std::mutex> lock(m_mutexRtpSendThreads);
  264. for(auto it : m_rtpSendThreads)
  265. {
  266. const auto& threadInfo = it->getThreadInfo();
  267. if(threadInfo.cardRoadInfo.nSoundCardNum == roadInfo.nSoundCardNum &&
  268. threadInfo.cardRoadInfo.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum)
  269. {
  270. it->stopThread();
  271. SPDLOG_LOGGER_TRACE(m_logger, "{}:{} 发送RTP数据线程已停止", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum);
  272. break;
  273. }
  274. }
  275. }
  276. /* 从引用计数中移除该录音通道 */
  277. auto it = m_mapRecordThreadRefCount.find(roadInfo);
  278. if(it != m_mapRecordThreadRefCount.end())
  279. {
  280. m_mapRecordThreadRefCount.erase(it);
  281. SPDLOG_LOGGER_INFO(m_logger, "{}:{} 录音线程已停止运行", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum);
  282. } else
  283. {
  284. SPDLOG_LOGGER_WARN(m_logger, "{}:{} 录音线程引用计数未找到", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum);
  285. }
  286. // 设置销毁录音线程标志
  287. m_isDestroyeRecordThread.store(true);
  288. return true;
  289. }
  290. /* 销毁录音线程 */
  291. void ThreadManager::destroyeRecordThread()
  292. {
  293. if(!m_isDestroyeRecordThread.load())
  294. {
  295. /* 没有需要销毁的线程 */
  296. return;
  297. }
  298. /* 销毁录音线程 */
  299. {
  300. std::lock_guard<std::mutex> lock(m_mutexRecordThreads);
  301. for(auto it = m_recordThreads.begin(); it != m_recordThreads.end(); )
  302. {
  303. BaseRecordThread* pThread = *it;
  304. if(pThread != nullptr)
  305. {
  306. if(pThread->getThreadInfo().threadState == EThreadState::State_Running)
  307. {
  308. auto pRecordThread = dynamic_cast<RecordThread*>(pThread);
  309. delete pRecordThread; // 删除线程
  310. pThread = nullptr;
  311. it = m_recordThreads.erase(it); // 从列表中移除
  312. }
  313. } else
  314. {
  315. it++;
  316. }
  317. }
  318. }
  319. /* 销毁分派线程 */
  320. {
  321. std::lock_guard<std::mutex> lock(m_mutexAssignSrcDataThreads);
  322. for(auto it = m_assignSrcDataThreads.begin(); it != m_assignSrcDataThreads.end(); )
  323. {
  324. BaseRecordThread* pThread = *it;
  325. if(pThread != nullptr)
  326. {
  327. if(pThread->getThreadInfo().threadState == EThreadState::State_Running)
  328. {
  329. auto pAssignSrcDataThread = dynamic_cast<AssignSrcDataThread*>(pThread);
  330. delete pAssignSrcDataThread; // 删除线程
  331. pThread = nullptr;
  332. it = m_assignSrcDataThreads.erase(it); // 从列表中移除
  333. }
  334. } else
  335. {
  336. it++;
  337. }
  338. }
  339. }
  340. /* 销毁生成wav小文件线程 */
  341. {
  342. std::lock_guard<std::mutex> lock(m_mutexCreateWAVThreads);
  343. for(auto it = m_createWAVThreads.begin(); it != m_createWAVThreads.end(); )
  344. {
  345. BaseRecordThread* pThread = *it;
  346. if(pThread != nullptr)
  347. {
  348. if(pThread->getThreadInfo().threadState == EThreadState::State_Running)
  349. {
  350. auto pCreateWAVThread = dynamic_cast<CreateWAVThread*>(pThread);
  351. delete pCreateWAVThread; // 删除线程
  352. pThread = nullptr;
  353. it = m_createWAVThreads.erase(it); // 从列表中移除
  354. }
  355. } else
  356. {
  357. it++;
  358. }
  359. }
  360. }
  361. /* 销毁计算音量线程 */
  362. {
  363. std::lock_guard<std::mutex> lock(m_mutexCreateDBThreads);
  364. for(auto it = m_createDBThreads.begin(); it != m_createDBThreads.end(); )
  365. {
  366. BaseRecordThread* pThread = *it;
  367. if(pThread != nullptr)
  368. {
  369. if(pThread->getThreadInfo().threadState == EThreadState::State_Running)
  370. {
  371. auto pCreateDBThread = dynamic_cast<CreateDBThread*>(pThread);
  372. delete pCreateDBThread; // 删除线程
  373. pThread = nullptr;
  374. it = m_createDBThreads.erase(it); // 从列表中移除
  375. }
  376. } else
  377. {
  378. it++;
  379. }
  380. }
  381. }
  382. /* 销毁生成长文件线程 */
  383. {
  384. std::lock_guard<std::mutex> lock(m_mutexCreateLongWAVThreads);
  385. for(auto it = m_createLongWAVThreads.begin(); it != m_createLongWAVThreads.end(); )
  386. {
  387. BaseRecordThread* pThread = *it;
  388. if(pThread != nullptr)
  389. {
  390. if(pThread->getThreadInfo().threadState == EThreadState::State_Running)
  391. {
  392. auto pCreateLongWAVThread = dynamic_cast<CreateLongFileThread*>(pThread);
  393. delete pCreateLongWAVThread; // 删除线程
  394. pThread = nullptr;
  395. it = m_createLongWAVThreads.erase(it); // 从列表中移除
  396. }
  397. } else
  398. {
  399. it++;
  400. }
  401. }
  402. }
  403. /* 销毁发送RTP数据线程 */
  404. {
  405. std::lock_guard<std::mutex> lock(m_mutexRtpSendThreads);
  406. for(auto it = m_rtpSendThreads.begin(); it != m_rtpSendThreads.end(); )
  407. {
  408. BaseRecordThread* pThread = *it;
  409. if(pThread != nullptr)
  410. {
  411. if(pThread->getThreadInfo().threadState == EThreadState::State_Running)
  412. {
  413. auto pRtpSendThread = dynamic_cast<RTPOneRoadThread*>(pThread);
  414. delete pRtpSendThread; // 删除线程
  415. pThread = nullptr;
  416. it = m_rtpSendThreads.erase(it); // 从列表中移除
  417. }
  418. } else
  419. {
  420. it++;
  421. }
  422. }
  423. }
  424. /* 销毁标志位置为false */
  425. m_isDestroyeRecordThread.store(false);
  426. }
  427. /* 查找录音线程 */
  428. BaseRecordThread* ThreadManager::findRecordThread(EThreadType type, int cardID, int recordID)
  429. {
  430. switch(type)
  431. {
  432. case EThreadType::Type_RecordSrc: /* 录音线程 */
  433. {
  434. std::lock_guard<std::mutex> lock(m_mutexRecordThreads);
  435. for (auto& pThread : m_recordThreads)
  436. {
  437. if (pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID &&
  438. pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID)
  439. {
  440. return pThread;
  441. }
  442. }
  443. }
  444. break;
  445. case EThreadType::Type_CreateWAV: /* 创建wav小文件和分离左右声道的线程 */
  446. {
  447. std::lock_guard<std::mutex> lock(m_mutexCreateWAVThreads);
  448. for (auto& pThread : m_createWAVThreads)
  449. {
  450. if (pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID &&
  451. pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID)
  452. {
  453. return pThread;
  454. }
  455. }
  456. }
  457. break;
  458. case EThreadType::Type_CreateDB: /* 计算音量和反相的线程 */
  459. {
  460. std::lock_guard<std::mutex> lock(m_mutexCreateDBThreads);
  461. for (auto& pThread : m_createDBThreads)
  462. {
  463. if (pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID &&
  464. pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID)
  465. {
  466. return pThread;
  467. }
  468. }
  469. }
  470. break;
  471. case EThreadType::Type_CreateLongWAV: /* 创建长文件的线程 */
  472. {
  473. std::lock_guard<std::mutex> lock(m_mutexCreateLongWAVThreads);
  474. for (auto& pThread : m_createLongWAVThreads)
  475. {
  476. if (pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID &&
  477. pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID)
  478. {
  479. return pThread;
  480. }
  481. }
  482. }
  483. break;
  484. case EThreadType::Type_AssignSrcData: /* 分派数据线程 */
  485. {
  486. std::lock_guard<std::mutex> lock(m_mutexAssignSrcDataThreads);
  487. for (auto& pThread : m_assignSrcDataThreads)
  488. {
  489. if (pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID &&
  490. pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID)
  491. {
  492. return pThread;
  493. }
  494. }
  495. }
  496. break;
  497. case EThreadType::Type_RtpSend: /* RTP发送线程 */
  498. {
  499. std::lock_guard<std::mutex> lock(m_mutexRtpSendThreads);
  500. for (auto& pThread : m_rtpSendThreads)
  501. {
  502. if (pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID &&
  503. pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID)
  504. {
  505. return pThread;
  506. }
  507. }
  508. }
  509. break;
  510. default:
  511. SPDLOG_LOGGER_ERROR(m_logger, "{} 查找录音线程失败,未知线程类型: {}", m_logBase, static_cast<int>(type));
  512. return nullptr; // 未知线程类型
  513. }
  514. return nullptr;
  515. }
  516. /* 获取创建WAV线程指针 */
  517. CreateWAVThread* ThreadManager::getCreateWAVThread(int cardID, int recordID)
  518. {
  519. std::lock_guard<std::mutex> lock(m_mutexCreateWAVThreads);
  520. for(auto& pThread : m_createWAVThreads)
  521. {
  522. if(pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID &&
  523. pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID)
  524. {
  525. return dynamic_cast<CreateWAVThread*>(pThread);
  526. }
  527. }
  528. return nullptr;
  529. }
  530. /* 获取创建音量值的线程 */
  531. CreateDBThread* ThreadManager::getCreateDBThread(int cardID, int recordID)
  532. {
  533. std::lock_guard<std::mutex> lock(m_mutexCreateDBThreads);
  534. for(auto& pThread : m_createDBThreads)
  535. {
  536. if(pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID &&
  537. pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID)
  538. {
  539. return dynamic_cast<CreateDBThread*>(pThread);
  540. }
  541. }
  542. return nullptr;
  543. }
  544. /* 获取发送Rtp数据的线程 */
  545. RTPOneRoadThread* ThreadManager::getRtpSendThread(int cardID, int recordID)
  546. {
  547. std::lock_guard<std::mutex> lock(m_mutexRtpSendThreads);
  548. for(auto& pThread : m_rtpSendThreads)
  549. {
  550. if(pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID &&
  551. pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID)
  552. {
  553. return dynamic_cast<RTPOneRoadThread*>(pThread);
  554. }
  555. }
  556. return nullptr;
  557. }
  558. /* 获取录制报警文件的线程 */
  559. CreateLongFileThread* ThreadManager::getCreateLongFileThread(int cardID, int recordID)
  560. {
  561. std::lock_guard<std::mutex> lock(m_mutexCreateLongWAVThreads);
  562. for(auto& pThread : m_createLongWAVThreads)
  563. {
  564. if(pThread->getThreadInfo().cardRoadInfo.nSoundCardNum == cardID &&
  565. pThread->getThreadInfo().cardRoadInfo.roadInfo.nRoadNum == recordID)
  566. {
  567. return dynamic_cast<CreateLongFileThread*>(pThread);
  568. }
  569. }
  570. return nullptr;
  571. }
  572. /* -------------------------------------------------------------------------------------------
  573. * 获取计算线程,如果该线程不存在则创建该线程
  574. * 当不需要此线程后,调用remove()函数去掉该线程
  575. * -------------------------------------------------------------------------------------------- */
  576. /* 获取一致性比对线程,线程不存在则创建 */
  577. // ConsistencyCompareThread* ThreadManager::getConsistencyCompareThread(const SoundCardRoadInfo_t& roadInfo1, const SoundCardRoadInfo_t& roadInfo2)
  578. // {
  579. // std::lock_guard<std::mutex> lock(m_mutexConsistencyCompareThreads);
  580. // for(const auto pThread : m_listConsistencyCompareThreads)
  581. // {
  582. // if(pThread->isRoadEqual(roadInfo1, roadInfo2))
  583. // {
  584. // return pThread; // 找到相同的线程,直接返回
  585. // }
  586. // }
  587. // /* 没找到该线程,创建新的线程 */
  588. // CompareItemRoadInfo_t item1;
  589. // item1.nCompareRoadNum = 1;
  590. // item1.scRoadInfo = roadInfo1;
  591. // CompareItemRoadInfo_t item2;
  592. // item2.nCompareRoadNum = 2;
  593. // item2.scRoadInfo = roadInfo2;
  594. // CalculateThreadInfo_t threadInfo;
  595. // threadInfo.compareItemInfo.mapRoad.insert(item1.nCompareRoadNum, item1);
  596. // threadInfo.compareItemInfo.mapRoad.insert(item2.nCompareRoadNum, item2);
  597. // ConsistencyCompareThread* newThread = new ConsistencyCompareThread(threadInfo);
  598. // if(newThread == nullptr)
  599. // {
  600. // SPDLOG_LOGGER_ERROR(m_logger, "创建一致性比对线程失败");
  601. // return nullptr; // 创建失败
  602. // }
  603. // CPPTP.add_task(&ConsistencyCompareThread::threadTask, newThread);
  604. // m_listConsistencyCompareThreads.push_back(newThread);
  605. // m_referCountConsistencyCompare++; // 引用计数加一
  606. // return newThread;
  607. // }
  608. /* 去掉线程,线程使用的计数减一,计数为0则销毁该线程 */
  609. // bool ThreadManager::removeConsistencyCompareThread(SoundCardRoadInfo_t& roadInfo1, SoundCardRoadInfo_t& roadInfo2)
  610. // {
  611. // std::lock_guard<std::mutex> lock(m_mutexConsistencyCompareThreads);
  612. // ConsistencyCompareThread* pThreadToRemove = nullptr;
  613. // for(const auto pThread : m_listConsistencyCompareThreads)
  614. // {
  615. // if(pThread->isRoadEqual(roadInfo1, roadInfo2))
  616. // {
  617. // pThreadToRemove = pThread; // 找到相同的线程,直接返回
  618. // break;
  619. // }
  620. // }
  621. // if(pThreadToRemove == nullptr)
  622. // {
  623. // SPDLOG_LOGGER_WARN(m_logger, "{}:{} - {}:{} 一致性比对线程未找到", roadInfo1.strSoundCardName.toStdString(), roadInfo1.roadInfo.nRoadNum,
  624. // roadInfo2.strSoundCardName.toStdString(), roadInfo2.roadInfo.nRoadNum);
  625. // return false; // 没找到该线程
  626. // }
  627. // m_referCountConsistencyCompare--; // 引用计数减一
  628. // if(m_referCountConsistencyCompare <= 0)
  629. // {
  630. // /* 停止线程,并一直等待其停止 */
  631. // pThreadToRemove->stopThreadBlock();
  632. // m_listConsistencyCompareThreads.remove(pThreadToRemove); // 从列表中移除
  633. // delete pThreadToRemove; // 删除线程
  634. // pThreadToRemove = nullptr;
  635. // m_referCountConsistencyCompare = 0; // 重置引用计数
  636. // SPDLOG_LOGGER_WARN(m_logger, "{}:{} - {}:{} 一致性比对线程已销毁", roadInfo1.strSoundCardName.toStdString(), roadInfo1.roadInfo.nRoadNum,
  637. // roadInfo2.strSoundCardName.toStdString(), roadInfo2.roadInfo.nRoadNum);
  638. // }
  639. // return true;
  640. // }
  641. /* 获取噪音检测线程 */
  642. NoiseDetectThread* ThreadManager::getNoiseDetectThread(const SoundCardRoadInfo_t& roadInfo, int compareItemID)
  643. {
  644. std::lock_guard<std::mutex> lock(m_mutexNoiseDetectThreads);
  645. NoiseDetectThread* pNoiseDetectThread = nullptr;
  646. for(const auto pThread : m_listNoiseDetectThreads)
  647. {
  648. const SoundCardRoadInfo_t& threadRoadInfo = pThread->getRoadInfo();
  649. if(threadRoadInfo.nSoundCardNum == roadInfo.nSoundCardNum &&
  650. threadRoadInfo.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum)
  651. {
  652. pNoiseDetectThread = pThread;
  653. break;
  654. }
  655. }
  656. /* 判断引用计数是否需要增加 */
  657. if(pNoiseDetectThread != nullptr)
  658. {
  659. /* 查找这个通道是否在列表中(实际上肯定在列表中) */
  660. bool isExist = false;
  661. for(auto it = m_mapNoiseDetectThreadRefCount.begin(); it != m_mapNoiseDetectThreadRefCount.end(); ++it)
  662. {
  663. if(it->first.nSoundCardNum == roadInfo.nSoundCardNum &&
  664. it->first.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum)
  665. {
  666. bool isFound = false;
  667. for(auto& id : it->second)
  668. {
  669. if(id == compareItemID)
  670. {
  671. isFound = true;
  672. break; // 找到相同的对比项ID,直接返回
  673. }
  674. }
  675. if(!isFound)
  676. {
  677. it->second.push_back(compareItemID); // 添加新的对比项ID
  678. SPDLOG_LOGGER_TRACE(m_logger, "{}:{} 噪音检测线程引用计数增加,当前计数: {}",
  679. roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum, it->second.size());
  680. }
  681. isExist = true;
  682. break;
  683. }
  684. }
  685. if(!isExist)
  686. {
  687. /* 不在引用计数的列表中,添加进入 */
  688. m_mapNoiseDetectThreadRefCount[roadInfo].push_back(compareItemID);
  689. }
  690. return pNoiseDetectThread;
  691. }
  692. /* 没找到该线程,创建新的线程 */
  693. CalculateThreadInfo_t threadInfo;
  694. CompareItemRoadInfo_t item;
  695. item.nCompareRoadNum = 1; // 假设噪音检测线程
  696. item.scRoadInfo = roadInfo;
  697. threadInfo.compareItemInfo.mapRoad.insert(item.nCompareRoadNum, item);
  698. NoiseDetectThread* newThread = new NoiseDetectThread(threadInfo);
  699. if(newThread == nullptr)
  700. {
  701. SPDLOG_LOGGER_ERROR(m_logger, "创建噪音检测线程失败");
  702. return nullptr; // 创建失败
  703. }
  704. CPPTP.add_task(&NoiseDetectThread::threadTask, newThread);
  705. m_listNoiseDetectThreads.push_back(newThread);
  706. /* 不在引用计数的列表中,添加进入 */
  707. m_mapNoiseDetectThreadRefCount[roadInfo].push_back(compareItemID);
  708. return newThread;
  709. }
  710. /* 去掉噪音检测线程,线程使用的计数减一,计数为0则销毁该线程 */
  711. bool ThreadManager::removeNoiseDetectThread(const SoundCardRoadInfo_t& roadInfo, int compareItemID)
  712. {
  713. std::lock_guard<std::mutex> lock(m_mutexNoiseDetectThreads);
  714. NoiseDetectThread* pThreadToRemove = nullptr;
  715. for(const auto pThread : m_listNoiseDetectThreads)
  716. {
  717. const SoundCardRoadInfo_t& threadRoadInfo = pThread->getRoadInfo();
  718. if(threadRoadInfo.nSoundCardNum == roadInfo.nSoundCardNum &&
  719. threadRoadInfo.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum)
  720. {
  721. pThreadToRemove = pThread; // 找到相同的线程,直接返回
  722. break;
  723. }
  724. }
  725. if(pThreadToRemove == nullptr)
  726. {
  727. SPDLOG_LOGGER_WARN(m_logger, "{}:{} 噪音检测线程未找到", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum);
  728. return false; // 没找到该线程
  729. }
  730. /* 引用计数减一 */
  731. int useCount = 0;
  732. for(auto it = m_mapNoiseDetectThreadRefCount.begin(); it != m_mapNoiseDetectThreadRefCount.end(); ++it)
  733. {
  734. if(it->first.nSoundCardNum == roadInfo.nSoundCardNum &&
  735. it->first.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum)
  736. {
  737. /* 将对比项ID从列表中删除 */
  738. auto& compareItemList = it->second;
  739. auto itemIt = std::find(compareItemList.begin(), compareItemList.end(), compareItemID);
  740. if(itemIt != compareItemList.end())
  741. {
  742. compareItemList.erase(itemIt); // 移除该对比项ID
  743. SPDLOG_LOGGER_TRACE(m_logger, "{}:{} 噪音检测线程引用计数减少,当前计数: {}",
  744. roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum, compareItemList.size());
  745. }
  746. useCount = compareItemList.size(); // 获取当前引用计数
  747. if(useCount <= 0)
  748. {
  749. m_mapNoiseDetectThreadRefCount.erase(it); // 如果引用计数为0,则从列表中移除
  750. }
  751. break; // 找到后退出循环
  752. }
  753. }
  754. if(useCount <= 0)
  755. {
  756. SPDLOG_LOGGER_INFO(m_logger, "{}:{} 噪音检测线程引用计数为0,准备销毁该线程",
  757. roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum);
  758. pThreadToRemove->stopThreadBlock(); // 停止线程
  759. m_listNoiseDetectThreads.remove(pThreadToRemove); // 从列表中移除
  760. delete pThreadToRemove; // 删除线程
  761. pThreadToRemove = nullptr;
  762. SPDLOG_LOGGER_INFO(m_logger, "{}:{} 噪音检测线程已销毁", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum);
  763. }
  764. return true;
  765. }
  766. // /* 获取音量报警线程 */
  767. // CalculateDBPhaseThread* ThreadManager::getCalculateDBPhaseThread(const SoundCardRoadInfo_t& roadInfo)
  768. // {
  769. // std::lock_guard<std::mutex> lock(m_mutexCalculateDBPhaseThreads);
  770. // for(const auto pThread : m_listCalculateDBPhaseThreads)
  771. // {
  772. // const SoundCardRoadInfo_t& threadRoadInfo = pThread->getRoadInfo();
  773. // if(threadRoadInfo.nSoundCardNum == roadInfo.nSoundCardNum &&
  774. // threadRoadInfo.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum)
  775. // {
  776. // return pThread; // 找到相同的线程,直接返回
  777. // }
  778. // }
  779. // /* 没找到该线程,创建新的线程 */
  780. // CompareItemRoadInfo_t item;
  781. // item.nCompareRoadNum = 1; // 假设音量报警线程
  782. // item.scRoadInfo = roadInfo;
  783. // threadInfo.compareItemInfo.mapRoad.insert(item.nCompareRoadNum, item);
  784. // CalculateDBPhaseThread* newThread = new CalculateDBPhaseThread(threadInfo);
  785. // if(newThread == nullptr)
  786. // {
  787. // SPDLOG_LOGGER_ERROR(m_logger, "创建音量报警线程失败");
  788. // return nullptr; // 创建失败
  789. // }
  790. // CPPTP.add_task(&CalculateDBPhaseThread::threadTask, newThread);
  791. // m_listCalculateDBPhaseThreads.push_back(newThread);
  792. // m_referCountCalculateDBPhase++; // 引用计数加一
  793. // return newThread;
  794. // }
  795. // /* 去掉音量报警线程,线程使用的计数减一,计数为0则销毁该线程 */
  796. // bool ThreadManager::removeCalculateDBPhaseThread(RoadNumberInfo_t& roadInfo)
  797. // {
  798. // std::lock_guard<std::mutex> lock(m_mutexCalculateDBPhaseThreads);
  799. // CalculateDBPhaseThread* pThreadToRemove = nullptr;
  800. // for(const auto pThread : m_listCalculateDBPhaseThreads)
  801. // {
  802. // if(pThread->getRoadInfo().roadID == roadInfo.roadID)
  803. // {
  804. // pThreadToRemove = pThread; // 找到相同的线程,直接返回
  805. // break;
  806. // }
  807. // }
  808. // if(pThreadToRemove == nullptr)
  809. // {
  810. // SPDLOG_LOGGER_WARN(m_logger, "{} 音量报警线程未找到", roadInfo.strRoadName);
  811. // return false; // 没找到该线程
  812. // }
  813. // m_referCountCalculateDBPhase--; // 引用计数减一
  814. // if(m_referCountCalculateDBPhase <= 0)
  815. // {
  816. // pThreadToRemove->stopThread(); // 停止线程
  817. // m_listCalculateDBPhaseThreads.remove(pThreadToRemove); // 从列表中移除
  818. // delete pThreadToRemove; // 删除线程
  819. // pThreadToRemove = nullptr;
  820. // m_referCountCalculateDBPhase = 0; // 重置引用计数
  821. // SPDLOG_LOGGER_INFO(m_logger, "{} 音量报警线程已销毁", roadInfo.strRoadName);
  822. // }
  823. // return true;
  824. // }
  825. /* RTP线程函数,套一层壳 */
  826. void ThreadManager::thread_RTPSend(RecordThreadInfo_t& threadInfo)
  827. {
  828. RTPOneRoadThread* pRtpSendThread = new RTPOneRoadThread(threadInfo);
  829. if(pRtpSendThread == nullptr)
  830. {
  831. SPDLOG_ERROR("{}:{} 创建RTP发送线程失败", threadInfo.cardRoadInfo.strSoundCardName.toStdString(), threadInfo.cardRoadInfo.roadInfo.nRoadNum);
  832. return;
  833. }
  834. /* 先加入队列,再开启线程 */
  835. std::lock_guard<std::mutex> lock(ThreadMan.m_mutexRtpSendThreads);
  836. ThreadMan.m_rtpSendThreads.push_back(pRtpSendThread);
  837. // pRtpSendThread->threadTask();
  838. }