ThreadManager.cpp 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914
  1. #include "ThreadManager.h"
  2. #include "CreateWAVThread.h"
  3. #include "CreateDBThread.h"
  4. #include "ConsistencyCompareThread.h"
  5. #include "GlobalVariable.h"
  6. #include "NoiseDetectThread.h"
  7. #include "RtpOneRoadThread.h"
  8. #include "CreateRecordFileThread.h"
  9. #include "RecordThread.h"
  10. #include "AssignSrcDataThread.h"
  11. #include "ThreadPool.h"
  12. #include <mutex>
  13. #include <string>
  14. ThreadManager::ThreadManager()
  15. {
  16. m_logger = spdlog::get("ACAServer");
  17. if(m_logger == nullptr)
  18. {
  19. fmt::print("ThreadManager: ACAServer Logger not found.\n");
  20. return;
  21. }
  22. m_logBase = "ThreadManager";
  23. }
  24. /* 启动所有线程 */
  25. // void ThreadManager::startAllThreads()
  26. // {
  27. // }
  28. /* 停止所有线程 */
  29. void ThreadManager::stopAllThreads()
  30. {
  31. }
  32. /* 创建一个录音通道及其附属的线程 */
  33. bool ThreadManager::createRecordThread(const OneSoundCardPCMInfo_t& pcmInfo, int compareItemID)
  34. {
  35. /* 先查找队列中有没有该录音通道 */
  36. std::unique_lock<std::mutex> lock(m_mutexRecordThreadRefCount);
  37. for(const auto& pair : m_mapRecordThreadRefCount)
  38. {
  39. if( pair.first == pcmInfo.pcmInfo.strPCMName)
  40. {
  41. SPDLOG_LOGGER_TRACE(m_logger, "{}:{} 录音线程已存在", pcmInfo.strSoundCardName, pcmInfo.pcmInfo.strPCMName);
  42. /* 录音线程已存在,增加引用计数
  43. 先判断对比项ID是否已经在列表中了 */
  44. bool idFound = false;
  45. for(const auto& it : pair.second)
  46. {
  47. if(it == compareItemID)
  48. {
  49. idFound = true;
  50. break;
  51. }
  52. }
  53. if(!idFound)
  54. {
  55. /* 如果对比项ID不存在,则添加到引用计数中 */
  56. m_mapRecordThreadRefCount[pair.first].push_back(compareItemID);
  57. }
  58. SPDLOG_LOGGER_INFO(m_logger, "{}:{} 录音线程引用计数增加,当前计数: {}",
  59. pcmInfo.strSoundCardName, pcmInfo.pcmInfo.strPCMName, m_mapRecordThreadRefCount[pair.first].size());
  60. /* 返回成功 */
  61. return true;
  62. }
  63. }
  64. /* 录音线程不存在,挨个创建线程 */
  65. /* 先创建生成wav小文件数据的线程 */
  66. RecordThreadInfo_t threadInfo;
  67. threadInfo.cardRoadInfo = pcmInfo;
  68. threadInfo.threadState = EThreadState::State_Inited;
  69. threadInfo.threadType = EThreadType::Type_CreateWAV;
  70. CreateWAVThread* pCreateWAVThread = new CreateWAVThread(threadInfo);
  71. if(pCreateWAVThread == nullptr)
  72. {
  73. SPDLOG_LOGGER_ERROR(m_logger, "{}:{} 创建生成wav小文件线程失败", pcmInfo.strSoundCardName, pcmInfo.pcmInfo.strPCMName);
  74. // return false; // 创建失败
  75. }else
  76. {
  77. CPPTP.add_task(&CreateWAVThread::thread_task, pCreateWAVThread);
  78. std::lock_guard<std::mutex> lock(m_mutexCreateWAVThreads);
  79. m_createWAVThreads.push_back(pCreateWAVThread);
  80. }
  81. /* 创建计算音量的线程 */
  82. threadInfo.threadType = EThreadType::Type_CreateDB;
  83. CreateDBThread* pCreateDBThread = new CreateDBThread(threadInfo);
  84. if(pCreateDBThread == nullptr)
  85. {
  86. SPDLOG_LOGGER_ERROR(m_logger, "{}:{} 创建计算音量线程失败", pcmInfo.strSoundCardName, pcmInfo.pcmInfo.strPCMName);
  87. // return false; // 创建失败
  88. }else
  89. {
  90. CPPTP.add_task(&CreateDBThread::thread_task, pCreateDBThread);
  91. std::lock_guard<std::mutex> lock(m_mutexCreateDBThreads);
  92. m_createDBThreads.push_back(pCreateDBThread);
  93. }
  94. /* 创建生成长文件的线程 */
  95. threadInfo.threadType = EThreadType::Type_CreateLongWAV;
  96. CreateRecordFileThread* pCreateLongWAVThread = new CreateRecordFileThread(threadInfo);
  97. if(pCreateLongWAVThread == nullptr)
  98. {
  99. SPDLOG_LOGGER_ERROR(m_logger, "{}:{} 创建生成长文件线程失败", pcmInfo.strSoundCardName, pcmInfo.pcmInfo.strPCMName);
  100. // return false; // 创建失败
  101. }else
  102. {
  103. CPPTP.add_task(&CreateRecordFileThread::thread_task, pCreateLongWAVThread);
  104. std::lock_guard<std::mutex> lock(m_mutexCreateLongWAVThreads);
  105. m_createLongWAVThreads.push_back(pCreateLongWAVThread);
  106. }
  107. /* 创建发送RTP数据的线程 */
  108. threadInfo.threadType = EThreadType::Type_RtpSend;
  109. CPPTP.add_task(&ThreadManager::thread_RTPSend, threadInfo);
  110. // RTPOneRoadThread* pRtpSendThread = new RTPOneRoadThread(threadInfo);
  111. // if(pRtpSendThread == nullptr)
  112. // {
  113. // SPDLOG_LOGGER_ERROR(m_logger, "{}:{} 创建发送RTP数据线程失败", pcmInfo.strSoundCardName, pcmInfo.pcmInfo.strPCMName);
  114. // }else
  115. // {
  116. // CPPTP.add_task(&RTPOneRoadThread::threadTask, pRtpSendThread);
  117. // std::lock_guard<std::mutex> lock(m_mutexRtpSendThreads);
  118. // m_rtpSendThreads.push_back(pRtpSendThread);
  119. // }
  120. /* 创建录音线程 */
  121. threadInfo.threadType = EThreadType::Type_RecordSrc;
  122. RecordThread* pRecordThread = new RecordThread(threadInfo);
  123. if(pRecordThread == nullptr)
  124. {
  125. SPDLOG_LOGGER_ERROR(m_logger, "{}:{} 创建录音线程失败", pcmInfo.strSoundCardName, pcmInfo.pcmInfo.strPCMName);
  126. // return false; // 创建失败
  127. }else
  128. {
  129. CPPTP.add_task(&RecordThread::thread_task, pRecordThread);
  130. std::lock_guard<std::mutex> lock(m_mutexRecordThreads);
  131. m_recordThreads.push_back(pRecordThread);
  132. }
  133. /* 创建分派数据线程 */
  134. threadInfo.threadType = EThreadType::Type_AssignSrcData;
  135. AssignSrcDataThread* pAssignSrcDataThread = new AssignSrcDataThread(threadInfo);
  136. if(pAssignSrcDataThread == nullptr)
  137. {
  138. SPDLOG_LOGGER_ERROR(m_logger, "{}:{} 创建分派数据线程失败", pcmInfo.strSoundCardName, pcmInfo.pcmInfo.strPCMName);
  139. // return false; // 创建失败
  140. }else
  141. {
  142. CPPTP.add_task(&AssignSrcDataThread::thread_task, pAssignSrcDataThread);
  143. std::lock_guard<std::mutex> lock(m_mutexAssignSrcDataThreads);
  144. m_assignSrcDataThreads.push_back(pAssignSrcDataThread);
  145. }
  146. /* 录音线程创建成功,增加引用计数 */
  147. m_mapRecordThreadRefCount[pcmInfo.pcmInfo.strPCMName].push_back(compareItemID);
  148. return true;
  149. }
  150. /**
  151. 销毁一个录音通道及其附属的线程,如果引用计数为0, 这里只停止该录音通道的所有线程,不会删除实例
  152. 线程实例会由其他管理线程定期去删除
  153. */
  154. bool ThreadManager::removeRecordThread(const OneSoundCardPCMInfo_t& pcmInfo, int compareItemID)
  155. {
  156. std::unique_lock<std::mutex> lock(m_mutexRecordThreadRefCount);
  157. /* 先查找这个引用计数是否存在 */
  158. int refCount = 0;
  159. for(auto& pair : m_mapRecordThreadRefCount)
  160. {
  161. if(pair.first == pcmInfo.pcmInfo.strPCMName)
  162. {
  163. /* 找到该引用计数,减少引用计数 */
  164. for(auto& it : pair.second)
  165. {
  166. if(it == compareItemID)
  167. {
  168. m_mapRecordThreadRefCount[pair.first].remove(it);
  169. break;
  170. }
  171. }
  172. refCount = m_mapRecordThreadRefCount[pair.first].size();
  173. break;
  174. }
  175. }
  176. std::string logBase = fmt::format("{}:{}", pcmInfo.strSoundCardName, pcmInfo.pcmInfo.strPCMName);
  177. if(refCount > 0)
  178. {
  179. SPDLOG_LOGGER_INFO(m_logger, "{} 录音线程引用计数减少,当前计数: {}", logBase, refCount);
  180. return true;
  181. }
  182. SPDLOG_LOGGER_INFO(m_logger, "{} 录音线程引用计数为0,即将停止该录音通道的所有线程", logBase);
  183. /* 引用计数为0,停止该录音通道的所有线程 */
  184. /* 停止录音线程 */
  185. {
  186. std::lock_guard<std::mutex> lock(m_mutexRecordThreads);
  187. for(auto it : m_recordThreads)
  188. {
  189. const auto& threadInfo = it->getThreadInfo();
  190. if(threadInfo.cardRoadInfo.pcmInfo.strPCMName == pcmInfo.pcmInfo.strPCMName)
  191. {
  192. it->thread_stop_block();
  193. SPDLOG_LOGGER_TRACE(m_logger, "{} 录音线程已停止", logBase);
  194. break;
  195. }
  196. }
  197. }
  198. /* 停止分派任务线程 */
  199. {
  200. std::lock_guard<std::mutex> lock(m_mutexAssignSrcDataThreads);
  201. for(auto it : m_assignSrcDataThreads)
  202. {
  203. const auto& threadInfo = it->getThreadInfo();
  204. if(threadInfo.cardRoadInfo.pcmInfo.strPCMName == pcmInfo.pcmInfo.strPCMName)
  205. {
  206. it->thread_stop_block();
  207. SPDLOG_LOGGER_TRACE(m_logger, "{} 分派数据线程已停止", logBase);
  208. break;
  209. }
  210. }
  211. }
  212. /* 停止生成wav小文件线程 */
  213. {
  214. std::lock_guard<std::mutex> lock(m_mutexCreateWAVThreads);
  215. for(auto it : m_createWAVThreads)
  216. {
  217. const auto& threadInfo = it->getThreadInfo();
  218. if(threadInfo.cardRoadInfo.pcmInfo.strPCMName == pcmInfo.pcmInfo.strPCMName)
  219. {
  220. it->thread_stop_block();
  221. SPDLOG_LOGGER_TRACE(m_logger, "{} 生成wav小文件线程已停止", logBase);
  222. break;
  223. }
  224. }
  225. }
  226. /* 停止计算音量线程 */
  227. {
  228. std::lock_guard<std::mutex> lock(m_mutexCreateDBThreads);
  229. for(auto it : m_createDBThreads)
  230. {
  231. const auto& threadInfo = it->getThreadInfo();
  232. if(threadInfo.cardRoadInfo.pcmInfo.strPCMName == pcmInfo.pcmInfo.strPCMName)
  233. {
  234. it->thread_stop_block();
  235. SPDLOG_LOGGER_TRACE(m_logger, "{} 计算音量线程已停止", logBase);
  236. break;
  237. }
  238. }
  239. }
  240. /* 停止生成长文件线程 */
  241. {
  242. std::lock_guard<std::mutex> lock(m_mutexCreateLongWAVThreads);
  243. for(auto it : m_createLongWAVThreads)
  244. {
  245. const auto& threadInfo = it->getThreadInfo();
  246. if(threadInfo.cardRoadInfo.pcmInfo.strPCMName == pcmInfo.pcmInfo.strPCMName)
  247. {
  248. it->thread_stop_block();
  249. SPDLOG_LOGGER_TRACE(m_logger, "{} 生成长文件线程已停止", logBase);
  250. break;
  251. }
  252. }
  253. }
  254. /* 停止发送RTP数据线程 */
  255. {
  256. std::lock_guard<std::mutex> lock(m_mutexRtpSendThreads);
  257. for(auto it : m_rtpSendThreads)
  258. {
  259. const auto& threadInfo = it->getThreadInfo();
  260. if(threadInfo.cardRoadInfo.pcmInfo.strPCMName == pcmInfo.pcmInfo.strPCMName)
  261. {
  262. it->thread_stop_block();
  263. SPDLOG_LOGGER_TRACE(m_logger, "{} 发送RTP数据线程已停止", logBase);
  264. break;
  265. }
  266. }
  267. }
  268. /* 从引用计数中移除该录音通道 */
  269. auto it = m_mapRecordThreadRefCount.find(pcmInfo.pcmInfo.strPCMName);
  270. if(it != m_mapRecordThreadRefCount.end())
  271. {
  272. m_mapRecordThreadRefCount.erase(it);
  273. SPDLOG_LOGGER_INFO(m_logger, "{} 录音线程已停止运行", logBase);
  274. } else
  275. {
  276. SPDLOG_LOGGER_WARN(m_logger, "{} 录音线程引用计数未找到", logBase);
  277. }
  278. // 设置销毁录音线程标志
  279. m_isDestroyeRecordThread.store(true);
  280. m_condVarDestroyRecord.notify_one();
  281. return true;
  282. }
  283. /* 销毁录音线程 */
  284. void ThreadManager::thread_destroyeRecordThread()
  285. {
  286. std::unique_lock<std::mutex> m_lock(m_mutexRecordThreadRefCount);
  287. m_condVarDestroyRecord.wait(m_lock, [this]() {
  288. return m_isDestroyeRecordThread.load();
  289. });
  290. if(!m_isDestroyeRecordThread.load())
  291. {
  292. /* 没有需要销毁的线程 */
  293. return;
  294. }
  295. /* 销毁录音线程 */
  296. {
  297. std::lock_guard<std::mutex> lock(m_mutexRecordThreads);
  298. for(auto it = m_recordThreads.begin(); it != m_recordThreads.end(); )
  299. {
  300. BaseRecordThread* pThread = *it;
  301. if(pThread != nullptr)
  302. {
  303. auto threadState = pThread->getThreadInfo().threadState;
  304. if(EThreadState::State_Stopped == threadState ||
  305. EThreadState::State_Error == threadState )
  306. {
  307. auto pRecordThread = dynamic_cast<RecordThread*>(pThread);
  308. delete pRecordThread; // 删除线程
  309. pThread = nullptr;
  310. it = m_recordThreads.erase(it); // 从列表中移除
  311. }
  312. } else
  313. {
  314. it++;
  315. }
  316. }
  317. }
  318. /* 销毁分派线程 */
  319. {
  320. std::lock_guard<std::mutex> lock(m_mutexAssignSrcDataThreads);
  321. for(auto it = m_assignSrcDataThreads.begin(); it != m_assignSrcDataThreads.end(); )
  322. {
  323. BaseRecordThread* pThread = *it;
  324. if(pThread != nullptr)
  325. {
  326. auto threadState = pThread->getThreadInfo().threadState;
  327. if(EThreadState::State_Stopped == threadState ||
  328. EThreadState::State_Error == threadState )
  329. {
  330. auto pAssignSrcDataThread = dynamic_cast<AssignSrcDataThread*>(pThread);
  331. delete pAssignSrcDataThread; // 删除线程
  332. pThread = nullptr;
  333. it = m_assignSrcDataThreads.erase(it); // 从列表中移除
  334. }
  335. } else
  336. {
  337. it++;
  338. }
  339. }
  340. }
  341. /* 销毁生成wav小文件线程 */
  342. {
  343. std::lock_guard<std::mutex> lock(m_mutexCreateWAVThreads);
  344. for(auto it = m_createWAVThreads.begin(); it != m_createWAVThreads.end(); )
  345. {
  346. BaseRecordThread* pThread = *it;
  347. if(pThread != nullptr)
  348. {
  349. auto threadState = pThread->getThreadInfo().threadState;
  350. if(EThreadState::State_Stopped == threadState ||
  351. EThreadState::State_Error == threadState )
  352. {
  353. auto pCreateWAVThread = dynamic_cast<CreateWAVThread*>(pThread);
  354. delete pCreateWAVThread; // 删除线程
  355. pThread = nullptr;
  356. it = m_createWAVThreads.erase(it); // 从列表中移除
  357. }
  358. } else
  359. {
  360. it++;
  361. }
  362. }
  363. }
  364. /* 销毁计算音量线程 */
  365. {
  366. std::lock_guard<std::mutex> lock(m_mutexCreateDBThreads);
  367. for(auto it = m_createDBThreads.begin(); it != m_createDBThreads.end(); )
  368. {
  369. BaseRecordThread* pThread = *it;
  370. if(pThread != nullptr)
  371. {
  372. auto threadState = pThread->getThreadInfo().threadState;
  373. if(EThreadState::State_Stopped == threadState ||
  374. EThreadState::State_Error == threadState )
  375. {
  376. auto pCreateDBThread = dynamic_cast<CreateDBThread*>(pThread);
  377. delete pCreateDBThread; // 删除线程
  378. pThread = nullptr;
  379. it = m_createDBThreads.erase(it); // 从列表中移除
  380. }
  381. } else
  382. {
  383. it++;
  384. }
  385. }
  386. }
  387. /* 销毁生成长文件线程 */
  388. {
  389. std::lock_guard<std::mutex> lock(m_mutexCreateLongWAVThreads);
  390. for(auto it = m_createLongWAVThreads.begin(); it != m_createLongWAVThreads.end(); )
  391. {
  392. BaseRecordThread* pThread = *it;
  393. if(pThread != nullptr)
  394. {
  395. auto threadState = pThread->getThreadInfo().threadState;
  396. if(EThreadState::State_Stopped == threadState ||
  397. EThreadState::State_Error == threadState )
  398. {
  399. auto pCreateLongWAVThread = dynamic_cast<CreateRecordFileThread*>(pThread);
  400. delete pCreateLongWAVThread; // 删除线程
  401. pThread = nullptr;
  402. it = m_createLongWAVThreads.erase(it); // 从列表中移除
  403. }
  404. } else
  405. {
  406. it++;
  407. }
  408. }
  409. }
  410. /* 销毁发送RTP数据线程 */
  411. {
  412. std::lock_guard<std::mutex> lock(m_mutexRtpSendThreads);
  413. for(auto it = m_rtpSendThreads.begin(); it != m_rtpSendThreads.end(); )
  414. {
  415. BaseRecordThread* pThread = *it;
  416. if(pThread != nullptr)
  417. {
  418. auto threadState = pThread->getThreadInfo().threadState;
  419. if(EThreadState::State_Stopped == threadState ||
  420. EThreadState::State_Error == threadState )
  421. {
  422. auto pRtpSendThread = dynamic_cast<RTPOneRoadThread*>(pThread);
  423. delete pRtpSendThread; // 删除线程
  424. pThread = nullptr;
  425. it = m_rtpSendThreads.erase(it); // 从列表中移除
  426. }
  427. } else
  428. {
  429. it++;
  430. }
  431. }
  432. }
  433. /* 销毁标志位置为false */
  434. m_isDestroyeRecordThread.store(false);
  435. }
  436. /* 查找录音线程 */
  437. BaseRecordThread* ThreadManager::findRecordThread(EThreadType type, std::string pcmName)
  438. {
  439. switch(type)
  440. {
  441. case EThreadType::Type_RecordSrc: /* 录音线程 */
  442. {
  443. std::lock_guard<std::mutex> lock(m_mutexRecordThreads);
  444. for (auto& pThread : m_recordThreads)
  445. {
  446. if (pThread->getThreadInfo().cardRoadInfo.pcmInfo.strPCMName == pcmName )
  447. {
  448. return pThread;
  449. }
  450. }
  451. }
  452. break;
  453. case EThreadType::Type_CreateWAV: /* 创建wav小文件和分离左右声道的线程 */
  454. {
  455. std::lock_guard<std::mutex> lock(m_mutexCreateWAVThreads);
  456. for (auto& pThread : m_createWAVThreads)
  457. {
  458. if (pThread->getThreadInfo().cardRoadInfo.pcmInfo.strPCMName == pcmName )
  459. {
  460. return pThread;
  461. }
  462. }
  463. }
  464. break;
  465. case EThreadType::Type_CreateDB: /* 计算音量和反相的线程 */
  466. {
  467. std::lock_guard<std::mutex> lock(m_mutexCreateDBThreads);
  468. for (auto& pThread : m_createDBThreads)
  469. {
  470. if (pThread->getThreadInfo().cardRoadInfo.pcmInfo.strPCMName == pcmName )
  471. {
  472. return pThread;
  473. }
  474. }
  475. }
  476. break;
  477. case EThreadType::Type_CreateLongWAV: /* 创建长文件的线程 */
  478. {
  479. std::lock_guard<std::mutex> lock(m_mutexCreateLongWAVThreads);
  480. for (auto& pThread : m_createLongWAVThreads)
  481. {
  482. if (pThread->getThreadInfo().cardRoadInfo.pcmInfo.strPCMName == pcmName )
  483. {
  484. return pThread;
  485. }
  486. }
  487. }
  488. break;
  489. case EThreadType::Type_AssignSrcData: /* 分派数据线程 */
  490. {
  491. std::lock_guard<std::mutex> lock(m_mutexAssignSrcDataThreads);
  492. for (auto& pThread : m_assignSrcDataThreads)
  493. {
  494. if (pThread->getThreadInfo().cardRoadInfo.pcmInfo.strPCMName == pcmName )
  495. {
  496. return pThread;
  497. }
  498. }
  499. }
  500. break;
  501. case EThreadType::Type_RtpSend: /* RTP发送线程 */
  502. {
  503. std::lock_guard<std::mutex> lock(m_mutexRtpSendThreads);
  504. for (auto& pThread : m_rtpSendThreads)
  505. {
  506. if (pThread->getThreadInfo().cardRoadInfo.pcmInfo.strPCMName == pcmName )
  507. {
  508. return pThread;
  509. }
  510. }
  511. }
  512. break;
  513. default:
  514. SPDLOG_LOGGER_ERROR(m_logger, "{} 查找录音线程失败,未知线程类型: {}", m_logBase, static_cast<int>(type));
  515. return nullptr; // 未知线程类型
  516. }
  517. return nullptr;
  518. }
  519. /* 获取创建WAV线程指针 */
  520. CreateWAVThread* ThreadManager::getCreateWAVThread(std::string pcmName)
  521. {
  522. std::lock_guard<std::mutex> lock(m_mutexCreateWAVThreads);
  523. for(auto& pThread : m_createWAVThreads)
  524. {
  525. if(pThread->getThreadInfo().cardRoadInfo.pcmInfo.strPCMName == pcmName)
  526. {
  527. return dynamic_cast<CreateWAVThread*>(pThread);
  528. }
  529. }
  530. return nullptr;
  531. }
  532. /* 获取创建音量值的线程 */
  533. CreateDBThread* ThreadManager::getCreateDBThread(std::string pcmName)
  534. {
  535. std::lock_guard<std::mutex> lock(m_mutexCreateDBThreads);
  536. for(auto& pThread : m_createDBThreads)
  537. {
  538. if(pThread->getThreadInfo().cardRoadInfo.pcmInfo.strPCMName == pcmName)
  539. {
  540. return dynamic_cast<CreateDBThread*>(pThread);
  541. }
  542. }
  543. return nullptr;
  544. }
  545. /* 获取发送Rtp数据的线程 */
  546. RTPOneRoadThread* ThreadManager::getRtpSendThread(std::string pcmName)
  547. {
  548. std::lock_guard<std::mutex> lock(m_mutexRtpSendThreads);
  549. for(auto& pThread : m_rtpSendThreads)
  550. {
  551. const auto& threadInfo = pThread->getThreadInfo();
  552. if(threadInfo.cardRoadInfo.pcmInfo.strPCMName == pcmName)
  553. {
  554. return dynamic_cast<RTPOneRoadThread*>(pThread);
  555. }
  556. }
  557. return nullptr;
  558. }
  559. /* 获取录制报警文件的线程 */
  560. CreateRecordFileThread* ThreadManager::getCreateRecordFileThread(std::string pcmName)
  561. {
  562. std::lock_guard<std::mutex> lock(m_mutexCreateLongWAVThreads);
  563. for(auto& pThread : m_createLongWAVThreads)
  564. {
  565. if(pThread->getThreadInfo().cardRoadInfo.pcmInfo.strPCMName == pcmName)
  566. {
  567. return dynamic_cast<CreateRecordFileThread*>(pThread);
  568. }
  569. }
  570. return nullptr;
  571. }
  572. /* -------------------------------------------------------------------------------------------
  573. * 获取计算线程,如果该线程不存在则创建该线程
  574. * 当不需要此线程后,调用remove()函数去掉该线程
  575. * -------------------------------------------------------------------------------------------- */
  576. /* 获取一致性比对线程,线程不存在则创建 */
  577. // ConsistencyCompareThread* ThreadManager::getConsistencyCompareThread(const SoundCardRoadInfo_t& roadInfo1, const SoundCardRoadInfo_t& roadInfo2)
  578. // {
  579. // std::lock_guard<std::mutex> lock(m_mutexConsistencyCompareThreads);
  580. // for(const auto pThread : m_listConsistencyCompareThreads)
  581. // {
  582. // if(pThread->isRoadEqual(roadInfo1, roadInfo2))
  583. // {
  584. // return pThread; // 找到相同的线程,直接返回
  585. // }
  586. // }
  587. // /* 没找到该线程,创建新的线程 */
  588. // CompareItemRoadInfo_t item1;
  589. // item1.nCompareRoadNum = 1;
  590. // item1.scRoadInfo = roadInfo1;
  591. // CompareItemRoadInfo_t item2;
  592. // item2.nCompareRoadNum = 2;
  593. // item2.scRoadInfo = roadInfo2;
  594. // CalculateThreadInfo_t threadInfo;
  595. // threadInfo.compareItemInfo.mapRoad.insert(item1.nCompareRoadNum, item1);
  596. // threadInfo.compareItemInfo.mapRoad.insert(item2.nCompareRoadNum, item2);
  597. // ConsistencyCompareThread* newThread = new ConsistencyCompareThread(threadInfo);
  598. // if(newThread == nullptr)
  599. // {
  600. // SPDLOG_LOGGER_ERROR(m_logger, "创建一致性比对线程失败");
  601. // return nullptr; // 创建失败
  602. // }
  603. // CPPTP.add_task(&ConsistencyCompareThread::threadTask, newThread);
  604. // m_listConsistencyCompareThreads.push_back(newThread);
  605. // m_referCountConsistencyCompare++; // 引用计数加一
  606. // return newThread;
  607. // }
  608. /* 去掉线程,线程使用的计数减一,计数为0则销毁该线程 */
  609. // bool ThreadManager::removeConsistencyCompareThread(SoundCardRoadInfo_t& roadInfo1, SoundCardRoadInfo_t& roadInfo2)
  610. // {
  611. // std::lock_guard<std::mutex> lock(m_mutexConsistencyCompareThreads);
  612. // ConsistencyCompareThread* pThreadToRemove = nullptr;
  613. // for(const auto pThread : m_listConsistencyCompareThreads)
  614. // {
  615. // if(pThread->isRoadEqual(roadInfo1, roadInfo2))
  616. // {
  617. // pThreadToRemove = pThread; // 找到相同的线程,直接返回
  618. // break;
  619. // }
  620. // }
  621. // if(pThreadToRemove == nullptr)
  622. // {
  623. // SPDLOG_LOGGER_WARN(m_logger, "{}:{} - {}:{} 一致性比对线程未找到", roadInfo1.strSoundCardName.toStdString(), roadInfo1.roadInfo.nRoadNum,
  624. // roadInfo2.strSoundCardName.toStdString(), roadInfo2.roadInfo.nRoadNum);
  625. // return false; // 没找到该线程
  626. // }
  627. // m_referCountConsistencyCompare--; // 引用计数减一
  628. // if(m_referCountConsistencyCompare <= 0)
  629. // {
  630. // /* 停止线程,并一直等待其停止 */
  631. // pThreadToRemove->thread_stopBlock();
  632. // m_listConsistencyCompareThreads.remove(pThreadToRemove); // 从列表中移除
  633. // delete pThreadToRemove; // 删除线程
  634. // pThreadToRemove = nullptr;
  635. // m_referCountConsistencyCompare = 0; // 重置引用计数
  636. // SPDLOG_LOGGER_WARN(m_logger, "{}:{} - {}:{} 一致性比对线程已销毁", roadInfo1.strSoundCardName.toStdString(), roadInfo1.roadInfo.nRoadNum,
  637. // roadInfo2.strSoundCardName.toStdString(), roadInfo2.roadInfo.nRoadNum);
  638. // }
  639. // return true;
  640. // }
  641. /* 获取噪音检测线程 */
  642. // NoiseDetectThread* ThreadManager::getNoiseDetectThread(const OneSoundCardPCMInfo_t& roadInfo, int compareItemID)
  643. // {
  644. // std::lock_guard<std::mutex> lock(m_mutexNoiseDetectThreads);
  645. // NoiseDetectThread* pNoiseDetectThread = nullptr;
  646. // for(const auto pThread : m_listNoiseDetectThreads)
  647. // {
  648. // const OneSoundCardPCMInfo_t & threadRoadInfo = pThread->getRoadInfo();
  649. // if(threadRoadInfo.pcmInfo.strPCMName == roadInfo.pcmInfo.strPCMName)
  650. // {
  651. // pNoiseDetectThread = pThread;
  652. // break;
  653. // }
  654. // }
  655. // /* 判断引用计数是否需要增加 */
  656. // if(pNoiseDetectThread != nullptr)
  657. // {
  658. // /* 查找这个通道是否在列表中(实际上肯定在列表中) */
  659. // bool isExist = false;
  660. // for(auto it = m_mapNoiseDetectThreadRefCount.begin(); it != m_mapNoiseDetectThreadRefCount.end(); ++it)
  661. // {
  662. // if(it->first.roadInfo. == roadInfo.pcmInfo.strPCMName &&
  663. // it->first.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum)
  664. // {
  665. // bool isFound = false;
  666. // for(auto& id : it->second)
  667. // {
  668. // if(id == compareItemID)
  669. // {
  670. // isFound = true;
  671. // break; // 找到相同的对比项ID,直接返回
  672. // }
  673. // }
  674. // if(!isFound)
  675. // {
  676. // it->second.push_back(compareItemID); // 添加新的对比项ID
  677. // SPDLOG_LOGGER_TRACE(m_logger, "{}:{} 噪音检测线程引用计数增加,当前计数: {}",
  678. // roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum, it->second.size());
  679. // }
  680. // isExist = true;
  681. // break;
  682. // }
  683. // }
  684. // if(!isExist)
  685. // {
  686. // /* 不在引用计数的列表中,添加进入 */
  687. // m_mapNoiseDetectThreadRefCount[roadInfo].push_back(compareItemID);
  688. // }
  689. // return pNoiseDetectThread;
  690. // }
  691. // /* 没找到该线程,创建新的线程 */
  692. // CalculateThreadInfo_t threadInfo;
  693. // CompareItemRoadInfo_t item;
  694. // item.nCompareRoadNum = 1; // 假设噪音检测线程
  695. // item.scRoadInfo = roadInfo;
  696. // threadInfo.compareItemInfo.mapRoad.insert(item.nCompareRoadNum, item);
  697. // NoiseDetectThread* newThread = new NoiseDetectThread(threadInfo);
  698. // if(newThread == nullptr)
  699. // {
  700. // SPDLOG_LOGGER_ERROR(m_logger, "创建噪音检测线程失败");
  701. // return nullptr; // 创建失败
  702. // }
  703. // CPPTP.add_task(&NoiseDetectThread::threadTask, newThread);
  704. // m_listNoiseDetectThreads.push_back(newThread);
  705. // /* 不在引用计数的列表中,添加进入 */
  706. // m_mapNoiseDetectThreadRefCount[roadInfo].push_back(compareItemID);
  707. // return newThread;
  708. // }
  709. /* 去掉噪音检测线程,线程使用的计数减一,计数为0则销毁该线程 */
  710. // bool ThreadManager::removeNoiseDetectThread(const SoundCardRoadInfo_t& roadInfo, int compareItemID)
  711. // {
  712. // std::lock_guard<std::mutex> lock(m_mutexNoiseDetectThreads);
  713. // NoiseDetectThread* pThreadToRemove = nullptr;
  714. // for(const auto pThread : m_listNoiseDetectThreads)
  715. // {
  716. // const SoundCardRoadInfo_t& threadRoadInfo = pThread->getRoadInfo();
  717. // if(threadRoadInfo.nSoundCardNum == roadInfo.nSoundCardNum &&
  718. // threadRoadInfo.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum)
  719. // {
  720. // pThreadToRemove = pThread; // 找到相同的线程,直接返回
  721. // break;
  722. // }
  723. // }
  724. // if(pThreadToRemove == nullptr)
  725. // {
  726. // SPDLOG_LOGGER_WARN(m_logger, "{}:{} 噪音检测线程未找到", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum);
  727. // return false; // 没找到该线程
  728. // }
  729. // /* 引用计数减一 */
  730. // int useCount = 0;
  731. // for(auto it = m_mapNoiseDetectThreadRefCount.begin(); it != m_mapNoiseDetectThreadRefCount.end(); ++it)
  732. // {
  733. // if(it->first.nSoundCardNum == roadInfo.nSoundCardNum &&
  734. // it->first.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum)
  735. // {
  736. // /* 将对比项ID从列表中删除 */
  737. // auto& compareItemList = it->second;
  738. // auto itemIt = std::find(compareItemList.begin(), compareItemList.end(), compareItemID);
  739. // if(itemIt != compareItemList.end())
  740. // {
  741. // compareItemList.erase(itemIt); // 移除该对比项ID
  742. // SPDLOG_LOGGER_TRACE(m_logger, "{}:{} 噪音检测线程引用计数减少,当前计数: {}",
  743. // roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum, compareItemList.size());
  744. // }
  745. // useCount = compareItemList.size(); // 获取当前引用计数
  746. // if(useCount <= 0)
  747. // {
  748. // m_mapNoiseDetectThreadRefCount.erase(it); // 如果引用计数为0,则从列表中移除
  749. // }
  750. // break; // 找到后退出循环
  751. // }
  752. // }
  753. // if(useCount <= 0)
  754. // {
  755. // SPDLOG_LOGGER_INFO(m_logger, "{}:{} 噪音检测线程引用计数为0,准备销毁该线程",
  756. // roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum);
  757. // pThreadToRemove->thread_stopBlock(); // 停止线程
  758. // m_listNoiseDetectThreads.remove(pThreadToRemove); // 从列表中移除
  759. // delete pThreadToRemove; // 删除线程
  760. // pThreadToRemove = nullptr;
  761. // SPDLOG_LOGGER_INFO(m_logger, "{}:{} 噪音检测线程已销毁", roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum);
  762. // }
  763. // return true;
  764. // }
  765. // /* 获取音量报警线程 */
  766. // CalculateDBPhaseThread* ThreadManager::getCalculateDBPhaseThread(const SoundCardRoadInfo_t& roadInfo)
  767. // {
  768. // std::lock_guard<std::mutex> lock(m_mutexCalculateDBPhaseThreads);
  769. // for(const auto pThread : m_listCalculateDBPhaseThreads)
  770. // {
  771. // const SoundCardRoadInfo_t& threadRoadInfo = pThread->getRoadInfo();
  772. // if(threadRoadInfo.nSoundCardNum == roadInfo.nSoundCardNum &&
  773. // threadRoadInfo.roadInfo.nRoadNum == roadInfo.roadInfo.nRoadNum)
  774. // {
  775. // return pThread; // 找到相同的线程,直接返回
  776. // }
  777. // }
  778. // /* 没找到该线程,创建新的线程 */
  779. // CompareItemRoadInfo_t item;
  780. // item.nCompareRoadNum = 1; // 假设音量报警线程
  781. // item.scRoadInfo = roadInfo;
  782. // threadInfo.compareItemInfo.mapRoad.insert(item.nCompareRoadNum, item);
  783. // CalculateDBPhaseThread* newThread = new CalculateDBPhaseThread(threadInfo);
  784. // if(newThread == nullptr)
  785. // {
  786. // SPDLOG_LOGGER_ERROR(m_logger, "创建音量报警线程失败");
  787. // return nullptr; // 创建失败
  788. // }
  789. // CPPTP.add_task(&CalculateDBPhaseThread::threadTask, newThread);
  790. // m_listCalculateDBPhaseThreads.push_back(newThread);
  791. // m_referCountCalculateDBPhase++; // 引用计数加一
  792. // return newThread;
  793. // }
  794. // /* 去掉音量报警线程,线程使用的计数减一,计数为0则销毁该线程 */
  795. // bool ThreadManager::removeCalculateDBPhaseThread(RoadNumberInfo_t& roadInfo)
  796. // {
  797. // std::lock_guard<std::mutex> lock(m_mutexCalculateDBPhaseThreads);
  798. // CalculateDBPhaseThread* pThreadToRemove = nullptr;
  799. // for(const auto pThread : m_listCalculateDBPhaseThreads)
  800. // {
  801. // if(pThread->getRoadInfo().roadID == roadInfo.roadID)
  802. // {
  803. // pThreadToRemove = pThread; // 找到相同的线程,直接返回
  804. // break;
  805. // }
  806. // }
  807. // if(pThreadToRemove == nullptr)
  808. // {
  809. // SPDLOG_LOGGER_WARN(m_logger, "{} 音量报警线程未找到", roadInfo.strRoadName);
  810. // return false; // 没找到该线程
  811. // }
  812. // m_referCountCalculateDBPhase--; // 引用计数减一
  813. // if(m_referCountCalculateDBPhase <= 0)
  814. // {
  815. // pThreadToRemove->thread_stop(); // 停止线程
  816. // m_listCalculateDBPhaseThreads.remove(pThreadToRemove); // 从列表中移除
  817. // delete pThreadToRemove; // 删除线程
  818. // pThreadToRemove = nullptr;
  819. // m_referCountCalculateDBPhase = 0; // 重置引用计数
  820. // SPDLOG_LOGGER_INFO(m_logger, "{} 音量报警线程已销毁", roadInfo.strRoadName);
  821. // }
  822. // return true;
  823. // }
  824. /* RTP线程函数,套一层壳 */
  825. void ThreadManager::thread_RTPSend(RecordThreadInfo_t& threadInfo)
  826. {
  827. RTPOneRoadThread* pRtpSendThread = new RTPOneRoadThread(threadInfo);
  828. if(pRtpSendThread == nullptr)
  829. {
  830. SPDLOG_ERROR("{}:{} 创建RTP发送线程失败", threadInfo.cardRoadInfo.strSoundCardName, threadInfo.cardRoadInfo.pcmInfo.strPCMName);
  831. return;
  832. }
  833. /* 先加入队列,再开启线程 */
  834. ThreadMan.m_mutexRtpSendThreads.lock();
  835. ThreadMan.m_rtpSendThreads.push_back(pRtpSendThread);
  836. ThreadMan.m_mutexRtpSendThreads.unlock();
  837. pRtpSendThread->thread_task();
  838. }