ThreadCompareItemManager.cpp 18 KB


  1. #include "ThreadCompareItemManager.h"
  2. #include "GlobalInfo.h"
  3. #include "SystemConfig.h"
  4. #include "CompareItemThread.h"
  5. #include "ThreadPool.h"
  6. #include "FromMQTT.h"
  7. #include "commonDefine.h"
  8. /* 给对比项套一层壳,这个函数就是新的线程,在里面new出新的对比项实例,防止Qt报线程归属权错误
  9. 在函数中将对比项实例插入到线程管理器中 */
  10. void ThreadCompareItemManager::thread_compareItem(CalculateThreadInfo_t threadInfo)
  11. {
  12. auto pThread = new CompareItemThread(threadInfo);
  13. if(pThread == nullptr)
  14. {
  15. SPDLOG_ERROR("ThreadCompareItemManager: 创建对比项线程失败");
  16. return;
  17. }
  18. CompareItemManager.addCompareItemThread(pThread);
  19. /* 启动线程,就会一直阻塞在这里了 */
  20. pThread->threadTask();
  21. }
  22. ThreadCompareItemManager::ThreadCompareItemManager()
  23. {
  24. }
  25. ThreadCompareItemManager::~ThreadCompareItemManager()
  26. {
  27. }
  28. /* 线程函数 */
  29. void ThreadCompareItemManager::thread_CompareItemManager()
  30. {
  31. m_logger = spdlog::get("CompareItemManager");
  32. if(m_logger == nullptr)
  33. {
  34. fmt::print("ThreadCompareItemManager: CompareItemManager Logger not found.\n");
  35. return;
  36. }
  37. /* 创建定时器和事件循环 */
  38. m_pEventLoop = new QEventLoop();
  39. m_pTimer = new QTimer();
  40. // std::function<void()> task = std::bind(&ThreadCompareItemManager::do_task, this);
  41. // m_pTimer->setInterval(10000);
  42. m_pTimer->setTimerType(Qt::PreciseTimer);
  43. m_pTimer->setSingleShot(false); // 设置为非单次定时器
  44. connect(m_pTimer, &QTimer::timeout, this, &ThreadCompareItemManager::do_task, Qt::DirectConnection);
  45. /* 初始化webapi */
  46. m_webAPIUrl = GInfo.webAPIUrl();
  47. m_webAPIID = GInfo.webAPIID();
  48. m_webAPIAppType = GInfo.webApiAppType();
  49. if(!m_fromWebAPI.initWebApi(m_webAPIUrl, m_webAPIID, m_webAPIAppType))
  50. {
  51. SPDLOG_LOGGER_ERROR(m_logger, "ThreadCompareItemManager: 初始化WebAPI失败");
  52. return;
  53. }
  54. /* 获取MQTT发布订阅 */
  55. m_pubTopic = GInfo.mqttPubTopicCompareItem();
  56. /* 获取基础配置,目前只获取一次 */
  57. updateBaseSettings();
  58. SPDLOG_LOGGER_INFO(m_logger, "开启对比项管理线程");
  59. m_pTimer->start(1000);
  60. m_pEventLoop->exec();
  61. SPDLOG_LOGGER_INFO(m_logger, "ThreadCompareItemManager: 线程结束");
  62. }
  63. /* 添加对比项实例 */
  64. void ThreadCompareItemManager::addCompareItemThread(CompareItemThread* pThread)
  65. {
  66. if(pThread == nullptr)
  67. {
  68. SPDLOG_LOGGER_ERROR(m_logger, "添加对比项线程失败,线程指针为空");
  69. return;
  70. }
  71. std::lock_guard<std::mutex> lock(m_mutexCompareItemThreads);
  72. int compareItemID = pThread->getThreadInfo().compareItemInfo.nID;
  73. if(m_mapThreads.contains(compareItemID))
  74. {
  75. SPDLOG_LOGGER_WARN(m_logger, "对比项线程已存在,ID: {}", compareItemID);
  76. return; // 对比项线程已存在
  77. }
  78. m_mapThreads.insert(compareItemID, pThread);
  79. SPDLOG_LOGGER_INFO(m_logger, "添加对比项线程成功,ID: {}", compareItemID);
  80. }
  81. /* 通过对比项ID和通道ID获取声卡通道信息 */
  82. SoundCardRoadInfo_t ThreadCompareItemManager::getSoundCardRoadInfo(int compareItemID, int roadNum)
  83. {
  84. std::lock_guard<std::mutex> lock(m_mutexCompareItemThreads);
  85. auto it = m_mapThreads.find(compareItemID);
  86. if(it == m_mapThreads.end())
  87. {
  88. SPDLOG_LOGGER_WARN(m_logger, "对比项线程不存在,ID: {}", compareItemID);
  89. return SoundCardRoadInfo_t();
  90. }
  91. auto compareInfo = it.value()->getThreadInfo().compareItemInfo;
  92. SoundCardRoadInfo_t roadInfo;
  93. for(const auto& road : compareInfo.mapRoad)
  94. {
  95. if(road.nCompareRoadNum == roadNum)
  96. {
  97. roadInfo = road.scRoadInfo;
  98. break;
  99. }
  100. }
  101. return roadInfo;
  102. }
  103. /* 任务函数 */
  104. void ThreadCompareItemManager::do_task()
  105. {
  106. // SPDLOG_LOGGER_ERROR(m_logger, "ThreadCompareItemManager: do_task() 函数被调用");
  107. /* 如果定时间隔小于10秒,则设置成10秒,一开始小是为了线程开启后立马执行一次 */
  108. if(m_pTimer->interval() < 10000)
  109. {
  110. // m_pTimer->stop();
  111. m_pTimer->setInterval(10000);
  112. // m_pTimer->start();
  113. }
  114. if(m_pFromMQTT == nullptr)
  115. {
  116. /* 初始化MQTT */
  117. initMQTT();
  118. }
  119. // SPDLOG_LOGGER_WARN(m_logger, "定时器是否在运行: {}, 触发间隔: {} ms", m_pTimer->isActive(), m_pTimer->interval());
  120. /* ------------------------------------------------------------------
  121. * 处理对比项信息
  122. * ------------------------------------------------------------------ */
  123. processCompareItemInfo();
  124. /* ------------------------------------------------------------------
  125. * 更新检测时段
  126. * ------------------------------------------------------------------ */
  127. processDetectPeriodInfo();
  128. /* ------------------------------------------------------------------
  129. * 更新对比项信息到MQTT
  130. * ------------------------------------------------------------------ */
  131. updateCompareItemInfoToMQTT();
  132. // SPDLOG_LOGGER_WARN(m_logger, "ThreadCompareItemManager: do_task() 函数执行完毕");
  133. }
  134. /* 更新基础设置信息,如数据库设置,噪音参数等 */
  135. bool ThreadCompareItemManager::updateBaseSettings()
  136. {
  137. /* 更新基础数据 */
  138. QMap<std::string, std::string> baseSettings;
  139. if(!m_fromWebAPI.getSystemConfig(baseSettings))
  140. {
  141. SPDLOG_LOGGER_ERROR(m_logger, "获取系统配置失败");
  142. return false;
  143. }
  144. /* 将获取到的配置转换成结构体 */
  145. SysConfig.parseConfigFromDatabase(baseSettings);
  146. /* 检测时段单独获取 */
  147. QMap<int, DetectPeriodConfig_t> mapDetectConfig;
  148. if(!m_fromWebAPI.getDetectPeriodConfig(mapDetectConfig))
  149. {
  150. SPDLOG_ERROR("获取对比项检测时段配置失败");
  151. return false;
  152. }
  153. SysConfig.setDetectPeriodConfig(mapDetectConfig);
  154. return true;
  155. }
  156. /* 对比项信息处理函数 */
  157. void ThreadCompareItemManager::processCompareItemInfo()
  158. {
  159. /* 获取对比项信息 */
  160. QList<CompareItemInfo_t> listNewItems;
  161. if(!m_fromWebAPI.getCompareItemInfo(listNewItems))
  162. {
  163. SPDLOG_LOGGER_DEBUG(m_logger, "ThreadCompareItemManager: 获取对比项失败");
  164. return;
  165. }
  166. checkCompareItemInfo(listNewItems, m_listCreateItems, m_listUpdateItems, m_listDeleteItems);
  167. SPDLOG_LOGGER_DEBUG(m_logger, "要退出的对比项个数: {}, 要更新的对比项个数: {}, 要创建的对比项个数: {}",
  168. m_listDeleteItems.size(), m_listUpdateItems.size(), m_listCreateItems.size());
  169. /* 先删除已消失的对比项信息 */
  170. processDeleteCompareItemThreads(m_listDeleteItems);
  171. /* 更新需要更新的线程 */
  172. updateRunningThreads(m_listUpdateItems);
  173. /* 再创建新的对比项线程 */
  174. createNewCompareItemThreads(m_listCreateItems);
  175. }
  176. /**
  177. * @brief 处理对比项信息,新获取的和已有的对比,会在这里更新 m_mapNowCompareItem 内容
  178. *
  179. * @param createList 创建列表
  180. * @param updateList 更新列表,根据对比项ID进行更新信息
  181. * @param deleteList 删除列表
  182. */
  183. void ThreadCompareItemManager::checkCompareItemInfo(QList<CompareItemInfo_t>& newList, QList<CompareItemInfo_t>& createList, QList<CompareItemInfo_t>& updateList, QList<int>& deleteList)
  184. {
  185. createList.clear();
  186. updateList.clear();
  187. deleteList.clear();
  188. m_mapNowCompareItem.clear();
  189. // QMap<int, CompareItemInfo_t> mapNowItems;
  190. /* 先从对比项线程中获取对比项信息 */
  191. for(auto it = m_mapThreads.begin(); it != m_mapThreads.end(); ++it)
  192. {
  193. BaseCalculateThread* pThread = it.value();
  194. if(pThread == nullptr)
  195. {
  196. continue;
  197. }
  198. /* 获取对比项信息 */
  199. CompareItemInfo_t itemInfo = pThread->getThreadInfo().compareItemInfo;
  200. m_mapNowCompareItem.insert(itemInfo.nID, itemInfo);
  201. }
  202. /* 遍历新获取的对比项信息,找出需要新增的对比项和需要更新的对比项 */
  203. for(const CompareItemInfo_t& item : newList)
  204. {
  205. if(!m_mapNowCompareItem.contains(item.nID))
  206. {
  207. /* 新对比项,添加到创建列表 */
  208. createList.append(item);
  209. } else
  210. {
  211. /* 已有对比项,检查是否需要更新 */
  212. const CompareItemInfo_t& existingItem = m_mapNowCompareItem.value(item.nID);
  213. /* 先对比基础信息 */
  214. if(!existingItem.isEqualBase(item))
  215. {
  216. /* 基础信息不同,需要更新 */
  217. updateList.append(item);
  218. continue;
  219. }
  220. /* 在对比对比项通道信息 */
  221. if(!existingItem.isEqualRoads(item))
  222. {
  223. /* 通道信息不同,需要更新 */
  224. updateList.append(item);
  225. continue;
  226. }
  227. }
  228. }
  229. /* 遍历当前对比项信息,找出需要删除的对比项 */
  230. for(auto it : m_mapNowCompareItem)
  231. {
  232. bool isFound = false;
  233. for(const CompareItemInfo_t& newItem : newList)
  234. {
  235. if(it.nID == newItem.nID)
  236. {
  237. isFound = true;
  238. break; // 找到对应的对比项,不需要删除
  239. }
  240. }
  241. if(!isFound)
  242. {
  243. /* 当前对比项不在新获取的对比项中,说明需要删除 */
  244. deleteList.append(it.nID);
  245. }
  246. }
  247. }
  248. /**
  249. * @brief 处理需要删除的对比项线程
  250. * 1、先处理已经停止的线程
  251. * 2、再将这次列表中的对比项ID对应的线程设置为停止状态,待到下次循环再删除已经停止完成的线程
  252. *
  253. * @param deleteList
  254. */
  255. void ThreadCompareItemManager::processDeleteCompareItemThreads(const QList<int>& deleteList)
  256. {
  257. /* 先处理已经停止运行的线程 */
  258. for(auto it = m_mapThreads.begin(); it != m_mapThreads.end();)
  259. {
  260. BaseCalculateThread* pThread = it.value();
  261. if(pThread == nullptr)
  262. {
  263. SPDLOG_LOGGER_WARN(m_logger, "对比项线程指针为空,即将删除该线程指针");
  264. it = m_mapThreads.erase(it);
  265. continue;
  266. }
  267. if(pThread->getThreadInfo().threadState == EThreadState::State_Stopped)
  268. {
  269. /* 线程已经停止,直接删除 */
  270. SPDLOG_LOGGER_INFO(m_logger, "对比项线程 {} 已经停止,准备删除", pThread->getThreadInfo().compareItemInfo.strName.toStdString());
  271. delete pThread;
  272. it = m_mapThreads.erase(it);
  273. continue;
  274. }
  275. ++it;
  276. }
  277. /* 停止本次需要停止的线程 */
  278. for(auto it : m_mapThreads)
  279. {
  280. int compareItemID = it->getThreadInfo().compareItemInfo.nID;
  281. if(deleteList.contains(compareItemID))
  282. {
  283. /* 设置线程停止标志 */
  284. it->stopThread();
  285. SPDLOG_LOGGER_INFO(m_logger, "对比项线程 {} 设置为停止状态", it->getThreadInfo().compareItemInfo.strName.toStdString());
  286. }
  287. }
  288. }
  289. /* 更新正在运行的线程信息 */
  290. void ThreadCompareItemManager::updateRunningThreads(const QList<CompareItemInfo_t>& updateList)
  291. {
  292. if(updateList.isEmpty())
  293. {
  294. return;
  295. }
  296. for(const CompareItemInfo_t& item : updateList)
  297. {
  298. auto it = m_mapThreads.find(item.nID);
  299. if(it == m_mapThreads.end())
  300. {
  301. SPDLOG_LOGGER_WARN(m_logger, "对比项线程 {} 不存在,无法更新信息", item.strName.toStdString());
  302. continue;
  303. }
  304. BaseCalculateThread* pThread = it.value();
  305. if(pThread == nullptr)
  306. {
  307. continue;
  308. }
  309. CalculateThreadInfo_t threadInfo;
  310. threadInfo.compareItemInfo = item;
  311. pThread->updateThreadInfo(threadInfo);
  312. }
  313. }
  314. /* 创建新的线程 */
  315. bool ThreadCompareItemManager::createNewCompareItemThreads(const QList<CompareItemInfo_t>& createList)
  316. {
  317. if(createList.isEmpty())
  318. {
  319. SPDLOG_LOGGER_DEBUG(m_logger, "没有新的对比项需要创建");
  320. return true;
  321. }
  322. for(auto& it : createList)
  323. {
  324. /* 创建新的对比项线程 */
  325. CalculateThreadInfo_t threadInfo;
  326. threadInfo.compareItemInfo = it;
  327. threadInfo.threadType = EThreadType::Type_CompareItem;
  328. threadInfo.threadState = EThreadState::State_Inited;
  329. CPPTP.add_task(&ThreadCompareItemManager::thread_compareItem, threadInfo);
  330. /* 创建线程对象 */
  331. // CompareItemThread* pThread = new CompareItemThread(threadInfo);
  332. // if(pThread == nullptr)
  333. // {
  334. // SPDLOG_LOGGER_ERROR(m_logger, "创建对比项线程 {} 失败", it.strName.toStdString());
  335. // return false;
  336. // }
  337. /* 启动线程 */
  338. // CPPTP.add_task(&CompareItemThread::threadTask, pThread);
  339. /* 添加到线程列表中 */
  340. // m_mapThreads.insert(it.nID, pThread);
  341. }
  342. return true;
  343. }
  344. /* 处理检测时段信息 */
  345. void ThreadCompareItemManager::processDetectPeriodInfo()
  346. {
  347. /* 获取计划信息 */
  348. QMap<int, DetectPeriodConfig_t> mapNewDetectConfig;
  349. if(!m_fromWebAPI.getDetectPeriodConfig(mapNewDetectConfig))
  350. {
  351. SPDLOG_LOGGER_ERROR(m_logger, "获取检测时段配置失败");
  352. return;
  353. }
  354. QMap<int, DetectPeriodConfig_t> mapUpdateDetectConfig;
  355. checkDetectPeriodInfo(mapNewDetectConfig, mapUpdateDetectConfig);
  356. /* 更新检测时段 */
  357. for(const auto& it : mapUpdateDetectConfig)
  358. {
  359. auto threadIt = m_mapThreads.find(it.nID);
  360. if(threadIt != m_mapThreads.end())
  361. {
  362. /* 找到对应的对比项线程,更新检测时段 */
  363. CompareItemThread* pThread = dynamic_cast<CompareItemThread*>(threadIt.value());
  364. if(pThread != nullptr)
  365. {
  366. pThread->setDetectPeriod(it);
  367. SPDLOG_LOGGER_TRACE(m_logger, "更新对比项 {} 的检测时段", pThread->getThreadInfo().compareItemInfo.strName.toStdString());
  368. }
  369. }
  370. }
  371. }
  372. /* 检查获取出更新的对比项信息 */
  373. void ThreadCompareItemManager::checkDetectPeriodInfo(QMap<int, DetectPeriodConfig_t> newDetectInfo, QMap<int, DetectPeriodConfig_t>& updateList)
  374. {
  375. for(const auto& it : newDetectInfo)
  376. {
  377. int compareItemID = it.nID;
  378. for(const auto& existingItem : m_mapDetectPeriod)
  379. {
  380. if(existingItem.nID == compareItemID)
  381. {
  382. /* 已经存在的对比项,检查是否需要更新 */
  383. if(existingItem == it)
  384. {
  385. /* 对比项信息相同,不需要更新 */
  386. continue;
  387. } else
  388. {
  389. /* 对比项信息不同,需要更新 */
  390. updateList.insert(compareItemID, it);
  391. }
  392. return;
  393. }
  394. }
  395. }
  396. }
  397. /* 更新对比项信息到MQTT */
  398. void ThreadCompareItemManager::updateCompareItemInfoToMQTT()
  399. {
  400. /* 生成发送的json文件 */
  401. nJson jsonArray = nJson::array();
  402. for(const auto& it : m_mapNowCompareItem)
  403. {
  404. nJson jsonItem;
  405. /* 对比项ID */
  406. jsonItem["compareItem_id"] = it.nID;
  407. /* 对比项名称 */
  408. jsonItem["compareItem_name"] = it.strName.toStdString();
  409. for(const auto& road : it.mapRoad)
  410. {
  411. nJson jsonRoad;
  412. /* 对比项通道编号和名称 */
  413. jsonRoad["road_num"] = road.nCompareRoadNum;
  414. jsonRoad["road_name"] = road.strCompareRoadName.toStdString();
  415. /* 通道使用的声卡编号 */
  416. jsonRoad["sound_card_num"] = road.scRoadInfo.nSoundCardNum;
  417. jsonRoad["sound_card_road_num"] = road.scRoadInfo.roadInfo.nRoadNum;
  418. jsonRoad["compareItem_roads"].push_back(jsonRoad);
  419. }
  420. /* 静音条件 */
  421. jsonItem["silence_switch"] = it.paramMute.isEnable;
  422. jsonItem["silence_threshold"] = it.paramMute.threshold.nThreshold;
  423. jsonItem["silence_duration"] = it.paramMute.nLen;
  424. jsonItem["silence_sensitivity"] = it.paramMute.nSensitivity;
  425. /* 过载条件 */
  426. jsonItem["overload_switch"] = it.paramOverload.isEnable;
  427. jsonItem["overload_threshold"] = it.paramOverload.threshold.nThreshold;
  428. jsonItem["overload_duration"] = it.paramOverload.nLen;
  429. jsonItem["overload_sensitivity"] = it.paramOverload.nSensitivity;
  430. /* 反相条件 */
  431. jsonItem["reverse_switch"] = it.paramPhase.isEnable;
  432. jsonItem["reverse_threshold"] = it.paramPhase.threshold.dThreshold;
  433. jsonItem["reverse_duration"] = it.paramPhase.nLen;
  434. jsonItem["reverse_sensitivity"] = it.paramPhase.nSensitivity;
  435. jsonArray.push_back(jsonItem);
  436. }
  437. /* 发送到MQTT */
  438. if(m_pFromMQTT->connectState() == QMQTT::ConnectionState::STATE_CONNECTED)
  439. {
  440. QByteArray jsonData = QByteArray::fromStdString(jsonArray.dump());
  441. if(!m_pFromMQTT->sendMessage(m_pubTopic, jsonData, 0, true))
  442. {
  443. SPDLOG_LOGGER_WARN(m_logger, "ThreadCompareItemManager: 发送对比项信息到MQTT失败");
  444. }else
  445. {
  446. SPDLOG_LOGGER_TRACE(m_logger, "ThreadCompareItemManager: 发送对比项信息到MQTT成功");
  447. }
  448. } else
  449. {
  450. SPDLOG_LOGGER_WARN(m_logger, "m_pFromMQTT 未连接到服务器,无法发送");
  451. }
  452. }
  453. /* 初始化MQTT */
  454. void ThreadCompareItemManager::initMQTT()
  455. {
  456. if(m_pFromMQTT == nullptr)
  457. {
  458. m_pFromMQTT = new FromMQTT();
  459. if(m_pFromMQTT == nullptr)
  460. {
  461. SPDLOG_LOGGER_ERROR(m_logger, "创建MQTT对象失败");
  462. return;
  463. }
  464. }
  465. /* 登陆MQTT */
  466. m_pFromMQTT->setIPAndPort(GInfo.mqttIP(), GInfo.mqttPort());
  467. // m_pFromMQTT->addSubcribe("LH_WEBINFO");
  468. m_pFromMQTT->setAutoReconnect(true);
  469. m_pFromMQTT->connectToServer();
  470. // connect(m_pFromMQTT, &FromMQTT::signal_recvMessage, [this](const QMQTT::Message& message) {
  471. // SPDLOG_LOGGER_WARN(m_logger, "--------------------- 接收到MQTT消息: {}", message.topic().toStdString());
  472. // SPDLOG_LOGGER_WARN(m_logger, "消息内容: {}", message.payload().toStdString());
  473. // });
  474. SPDLOG_LOGGER_INFO(m_logger, "☆ 连接MQTT服务器: {}:{}, 对比项信息订阅主题: {}", GInfo.mqttIP().toStdString(), GInfo.mqttPort(), m_pubTopic.toStdString());
  475. }