ThreadCompareItemManager.cpp 22 KB


  1. #include "ThreadCompareItemManager.h"
  2. #include "GlobalInfo.h"
  3. #include "SystemConfig.h"
  4. #include "CompareItemThread.h"
  5. #include "ThreadPool.h"
  6. #include "FromMQTT.h"
  7. #include "commonDefine.h"
  8. /* 给对比项套一层壳,这个函数就是新的线程,在里面new出新的对比项实例,防止Qt报线程归属权错误
  9. 在函数中将对比项实例插入到线程管理器中 */
  10. void ThreadCompareItemManager::thread_compareItem(CalculateThreadInfo_t threadInfo)
  11. {
  12. auto pThread = new CompareItemThread(threadInfo);
  13. if(pThread == nullptr)
  14. {
  15. SPDLOG_ERROR("ThreadCompareItemManager: 创建对比项线程失败");
  16. return;
  17. }
  18. CompareItemManager.addCompareItemThread(pThread);
  19. /* 启动线程,就会一直阻塞在这里了 */
  20. pThread->thread_task();
  21. }
  22. ThreadCompareItemManager::ThreadCompareItemManager()
  23. {
  24. }
  25. ThreadCompareItemManager::~ThreadCompareItemManager()
  26. {
  27. }
  28. /* 线程函数 */
  29. void ThreadCompareItemManager::thread_CompareItemManager()
  30. {
  31. m_logger = spdlog::get("CompareItemManager");
  32. if(m_logger == nullptr)
  33. {
  34. fmt::print("ThreadCompareItemManager: CompareItemManager Logger not found.\n");
  35. return;
  36. }
  37. /* 创建定时器和事件循环 */
  38. m_pEventLoop = new QEventLoop();
  39. m_pTimer = new QTimer();
  40. // m_pTimer->setInterval(10000);
  41. m_pTimer->setTimerType(Qt::PreciseTimer);
  42. m_pTimer->setSingleShot(false); // 设置为非单次定时器
  43. /* 这里要设置直接连接,才会在子线程中调用槽函数 */
  44. connect(m_pTimer, &QTimer::timeout, this, &ThreadCompareItemManager::do_task, Qt::DirectConnection);
  45. /* 初始化webapi */
  46. m_webAPIUrl = GInfo.webAPIUrl();
  47. m_webAPIID = GInfo.webAPIID();
  48. m_webAPIAppType = GInfo.webApiAppType();
  49. if(!m_fromWebAPI.initWebApi(m_webAPIUrl, m_webAPIID, m_webAPIAppType))
  50. {
  51. SPDLOG_LOGGER_ERROR(m_logger, "ThreadCompareItemManager: 初始化WebAPI失败");
  52. return;
  53. }
  54. /* 获取MQTT发布订阅 */
  55. m_pubTopic = GInfo.mqttPubTopicCompareItem();
  56. initMQTT();
  57. /* 获取基础配置,目前只获取一次 */
  58. updateBaseSettings();
  59. SPDLOG_LOGGER_INFO(m_logger, "开启对比项管理线程");
  60. m_pTimer->start(2000);
  61. /* 连接mqtt服务 */
  62. m_pFromMQTT->connectToServer();
  63. m_pEventLoop->exec();
  64. SPDLOG_LOGGER_INFO(m_logger, "ThreadCompareItemManager: 线程结束");
  65. }
  66. /* 通过对比项ID和通道ID获取声卡通道信息 */
  67. OneSoundCardPCMInfo_t ThreadCompareItemManager::getSoundCardRoadInfo(int compareItemID, int roadNum)
  68. {
  69. std::lock_guard<std::mutex> lock(m_mutexCompareItemThreads);
  70. auto it = m_mapThreads.find(compareItemID);
  71. if(it == m_mapThreads.end())
  72. {
  73. SPDLOG_LOGGER_WARN(m_logger, "对比项线程不存在,ID: {}", compareItemID);
  74. return OneSoundCardPCMInfo_t();
  75. }
  76. auto compareInfo = it.value()->getThreadInfo().compareItemInfo;
  77. OneSoundCardPCMInfo_t roadInfo;
  78. for(const auto& road : compareInfo.mapRoad)
  79. {
  80. if(road.nCompareRoadNum == roadNum)
  81. {
  82. roadInfo = road.scRoadInfo;
  83. break;
  84. }
  85. }
  86. return roadInfo;
  87. }
  88. /* 任务函数 */
  89. void ThreadCompareItemManager::do_task()
  90. {
  91. /* 如果定时间隔小于10秒,则设置成10秒,一开始小是为了线程开启后立马执行一次 */
  92. if(m_pTimer->interval() < 2000)
  93. {
  94. m_pTimer->setInterval(2000);
  95. }
  96. /* 判断MQTT是否连接成功,未连接则再次连接 */
  97. if(m_pFromMQTT->connectState() != QMQTT::ConnectionState::STATE_CONNECTED)
  98. {
  99. SPDLOG_LOGGER_WARN(m_logger, "MQTT未连接,尝试重新连接");
  100. m_pFromMQTT->connectToServer();
  101. }
  102. /* ------------------------------------------------------------------
  103. * 处理对比项信息
  104. * ------------------------------------------------------------------ */
  105. processCompareItemInfo();
  106. /* ------------------------------------------------------------------
  107. * 更新检测时段
  108. * ------------------------------------------------------------------ */
  109. processDetectPeriodInfo();
  110. /* ------------------------------------------------------------------
  111. * 更新对比项信息到MQTT
  112. * ------------------------------------------------------------------ */
  113. updateCompareItemInfoToMQTT();
  114. }
  115. /* 添加对比项实例 */
  116. void ThreadCompareItemManager::addCompareItemThread(CompareItemThread* pThread)
  117. {
  118. if(pThread == nullptr)
  119. {
  120. SPDLOG_LOGGER_ERROR(m_logger, "添加对比项线程失败,线程指针为空");
  121. return;
  122. }
  123. std::lock_guard<std::mutex> lock(m_mutexCompareItemThreads);
  124. int compareItemID = pThread->getThreadInfo().compareItemInfo.nID;
  125. if(m_mapThreads.contains(compareItemID))
  126. {
  127. SPDLOG_LOGGER_WARN(m_logger, "对比项线程已存在,ID: {}", compareItemID);
  128. return; // 对比项线程已存在
  129. }
  130. m_mapThreads.insert(compareItemID, pThread);
  131. SPDLOG_LOGGER_INFO(m_logger, "添加对比项线程成功,ID: {}", compareItemID);
  132. }
  133. /* 更新基础设置信息,如数据库设置,噪音参数等 */
  134. bool ThreadCompareItemManager::updateBaseSettings()
  135. {
  136. /* 更新基础数据 */
  137. QMap<std::string, std::string> baseSettings;
  138. if(!m_fromWebAPI.getSystemConfig(baseSettings))
  139. {
  140. SPDLOG_LOGGER_ERROR(m_logger, "获取系统配置失败");
  141. return false;
  142. }
  143. /* 将获取到的配置转换成结构体 */
  144. SysConfig.parseConfigFromDatabase(baseSettings);
  145. /* 检测时段单独获取 */
  146. QMap<int, DetectPeriodConfig_t> mapDetectConfig;
  147. if(!m_fromWebAPI.getDetectPeriodConfig(mapDetectConfig))
  148. {
  149. SPDLOG_ERROR("获取对比项检测时段配置失败");
  150. return false;
  151. }
  152. SysConfig.setDetectPeriodConfig(mapDetectConfig);
  153. return true;
  154. }
  155. /* 对比项信息处理函数 */
  156. void ThreadCompareItemManager::processCompareItemInfo()
  157. {
  158. /* 获取对比项信息 */
  159. QList<CompareItemInfo_t> listNewItems;
  160. if(!m_fromWebAPI.getCompareItemInfo(listNewItems))
  161. {
  162. SPDLOG_LOGGER_DEBUG(m_logger, "ThreadCompareItemManager: 获取对比项失败");
  163. return;
  164. }
  165. checkCompareItemInfo(listNewItems, m_listCreateItems, m_listDeleteItems);
  166. SPDLOG_LOGGER_DEBUG(m_logger, "要退出的对比项个数: {}, 要创建的对比项个数: {}",
  167. m_listDeleteItems.size(), m_listCreateItems.size());
  168. /* 先删除已消失的对比项信息 */
  169. processDeleteCompareItemThreads(m_listDeleteItems);
  170. /* 更新需要更新的线程 */
  171. // updateRunningThreads(m_listUpdateItems);
  172. /* 再创建新的对比项线程 */
  173. createNewCompareItemThreads(m_listCreateItems);
  174. SPDLOG_LOGGER_DEBUG(m_logger, "当前对比项个数: {}", m_mapNowCompareItem.size());
  175. }
  176. /**
  177. * @brief 处理对比项信息,新获取的和已有的对比,会在这里更新 m_mapNowCompareItem 内容
  178. *
  179. * @param createList 创建列表
  180. * @param updateList 更新列表,根据对比项ID进行更新信息,没有更新列表了,对比项有更新就删除重建
  181. * @param deleteList 删除列表
  182. */
  183. void ThreadCompareItemManager::checkCompareItemInfo(QList<CompareItemInfo_t>& newList, QList<CompareItemInfo_t>& createList, QList<int>& deleteList)
  184. {
  185. createList.clear();
  186. // updateList.clear();
  187. deleteList.clear();
  188. m_mapNowCompareItem.clear();
  189. // QMap<int, CompareItemInfo_t> mapNowItems;
  190. /* 先从对比项线程中获取对比项信息 */
  191. for(auto it = m_mapThreads.begin(); it != m_mapThreads.end(); ++it)
  192. {
  193. BaseCalculateThread* pThread = it.value();
  194. if(pThread == nullptr)
  195. {
  196. continue;
  197. }
  198. /* 获取对比项信息 */
  199. CompareItemInfo_t itemInfo = pThread->getThreadInfo().compareItemInfo;
  200. m_mapNowCompareItem.insert(itemInfo.nID, itemInfo);
  201. }
  202. /* 遍历新获取的对比项信息,找出需要新增的对比项和需要更新的对比项 */
  203. for(const CompareItemInfo_t& item : newList)
  204. {
  205. if(!m_mapNowCompareItem.contains(item.nID))
  206. {
  207. /* 判断这个是否被启用 */
  208. if(item.isEnable == false)
  209. {
  210. continue;
  211. }
  212. /* 新对比项,添加到创建列表 */
  213. createList.append(item);
  214. } else
  215. {
  216. /* 已有对比项,检查是否需要更新 */
  217. const CompareItemInfo_t& existingItem = m_mapNowCompareItem.value(item.nID);
  218. /* 先对比基础信息 */
  219. if(!existingItem.isEqualBase(item))
  220. {
  221. /* 基础信息不同,需要更新,也可能是被禁用了导致的不同 */
  222. if(item.isEnable == false)
  223. {
  224. /* 如果新对比项被禁用,则需要删除 */
  225. deleteList.append(item.nID);
  226. continue;
  227. }
  228. // updateList.append(item);
  229. deleteList.append(item.nID); // 先删除再创建
  230. createList.append(item); // 添加到创建列表
  231. continue;
  232. }
  233. /* 在对比对比项通道信息 */
  234. if(!existingItem.isEqualRoads(item))
  235. {
  236. /* 通道信息不同,需要更新 */
  237. // updateList.append(item);
  238. deleteList.append(item.nID); // 先删除再创建
  239. createList.append(item); // 添加到创建列表
  240. continue;
  241. }
  242. }
  243. }
  244. /* 遍历当前对比项信息,找出需要删除的对比项 */
  245. for(auto it : m_mapNowCompareItem)
  246. {
  247. bool isFound = false;
  248. for(const CompareItemInfo_t& newItem : newList)
  249. {
  250. if(it.nID == newItem.nID)
  251. {
  252. /* 找到对应的对比项,跳过 */
  253. isFound = true;
  254. break;
  255. }
  256. }
  257. if(!isFound)
  258. {
  259. /* 当前对比项不在新获取的对比项中,说明需要删除 */
  260. deleteList.append(it.nID);
  261. }
  262. }
  263. }
  264. /**
  265. * @brief 处理需要删除的对比项线程
  266. * 1、先处理已经停止的线程,初始化失败的对比项会设置错误状态
  267. * 2、再将这次列表中的对比项ID对应的线程设置为停止状态,待到下次循环再删除已经停止完成的线程
  268. *
  269. * @param deleteList
  270. */
  271. void ThreadCompareItemManager::processDeleteCompareItemThreads(const QList<int>& deleteList)
  272. {
  273. /* 先处理已经停止运行的线程,是上次循环停止的线程 */
  274. for(auto it = m_mapThreads.begin(); it != m_mapThreads.end();)
  275. {
  276. BaseCalculateThread* pThread = it.value();
  277. if(pThread == nullptr)
  278. {
  279. SPDLOG_LOGGER_WARN(m_logger, "对比项线程指针为空,即将删除该线程指针");
  280. it = m_mapThreads.erase(it);
  281. continue;
  282. }
  283. if(pThread->getThreadInfo().threadState == EThreadState::State_Stopped ||
  284. pThread->getThreadInfo().threadState == EThreadState::State_Error )
  285. {
  286. /* 确保线程已经停止 */
  287. pThread->thread_stop_block();
  288. SPDLOG_LOGGER_INFO(m_logger, "对比项线程 {} 已经停止,准备删除", pThread->getThreadInfo().compareItemInfo.strName.toStdString());
  289. delete pThread;
  290. it = m_mapThreads.erase(it);
  291. continue;
  292. }
  293. ++it;
  294. }
  295. /* 停止本次需要停止的线程 */
  296. for(auto it = m_mapThreads.begin(); it != m_mapThreads.end(); )
  297. {
  298. int compareItemID = it.value()->getThreadInfo().compareItemInfo.nID;
  299. if(deleteList.contains(compareItemID))
  300. {
  301. std::string compareItemName = it.value()->getThreadInfo().compareItemInfo.strName.toStdString();
  302. /* 设置线程停止标志,阻塞等待线程停止,然后再删除 */
  303. it.value()->thread_stop_block();
  304. delete it.value();
  305. it.value() = nullptr; // 设置为nullptr,防止悬空指针
  306. it = m_mapThreads.erase(it); // 从列表中删除
  307. SPDLOG_LOGGER_INFO(m_logger, "对比项线程 {} 已删除", compareItemName);
  308. }else {
  309. ++it;
  310. }
  311. }
  312. }
  313. /* 更新正在运行的线程信息 */
  314. void ThreadCompareItemManager::updateRunningThreads(const QList<CompareItemInfo_t>& updateList)
  315. {
  316. if(updateList.isEmpty())
  317. {
  318. return;
  319. }
  320. for(const CompareItemInfo_t& item : updateList)
  321. {
  322. auto it = m_mapThreads.find(item.nID);
  323. if(it == m_mapThreads.end())
  324. {
  325. SPDLOG_LOGGER_WARN(m_logger, "对比项线程 {} 不存在,无法更新信息", item.strName.toStdString());
  326. continue;
  327. }
  328. BaseCalculateThread* pThread = it.value();
  329. if(pThread == nullptr)
  330. {
  331. continue;
  332. }
  333. CalculateThreadInfo_t threadInfo;
  334. threadInfo.compareItemInfo = item;
  335. pThread->updateThreadInfo(threadInfo);
  336. }
  337. }
  338. /* 创建新的线程 */
  339. bool ThreadCompareItemManager::createNewCompareItemThreads(const QList<CompareItemInfo_t>& createList)
  340. {
  341. if(createList.isEmpty())
  342. {
  343. // SPDLOG_LOGGER_DEBUG(m_logger, "没有新的对比项需要创建");
  344. return true;
  345. }
  346. for(auto& it : createList)
  347. {
  348. /* 创建新的对比项线程 */
  349. CalculateThreadInfo_t threadInfo;
  350. threadInfo.compareItemInfo = it;
  351. threadInfo.threadType = EThreadType::Type_CompareItem;
  352. threadInfo.threadState.store(EThreadState::State_Inited);
  353. CPPTP.add_task(&ThreadCompareItemManager::thread_compareItem, threadInfo);
  354. /* 创建线程对象 */
  355. // CompareItemThread* pThread = new CompareItemThread(threadInfo);
  356. // if(pThread == nullptr)
  357. // {
  358. // SPDLOG_LOGGER_ERROR(m_logger, "创建对比项线程 {} 失败", it.strName.toStdString());
  359. // return false;
  360. // }
  361. /* 启动线程 */
  362. // CPPTP.add_task(&CompareItemThread::threadTask, pThread);
  363. /* 添加到线程列表中 */
  364. // m_mapThreads.insert(it.nID, pThread);
  365. }
  366. return true;
  367. }
  368. /* 处理检测时段信息 */
  369. void ThreadCompareItemManager::processDetectPeriodInfo()
  370. {
  371. /* 获取计划信息 */
  372. QMap<int, DetectPeriodConfig_t> mapNewDetectConfig;
  373. if(!m_fromWebAPI.getDetectPeriodConfig(mapNewDetectConfig))
  374. {
  375. SPDLOG_LOGGER_ERROR(m_logger, "获取检测时段配置失败");
  376. return;
  377. }
  378. QMap<int, DetectPeriodConfig_t> mapUpdateDetectConfig;
  379. checkDetectPeriodInfo(m_mapDetectPeriod, mapNewDetectConfig, mapUpdateDetectConfig);
  380. /* 更新检测时段 */
  381. for(const auto& it : mapUpdateDetectConfig)
  382. {
  383. auto threadIt = m_mapThreads.find(it.nID);
  384. if(threadIt != m_mapThreads.end())
  385. {
  386. /* 找到对应的对比项线程,更新检测时段 */
  387. CompareItemThread* pThread = dynamic_cast<CompareItemThread*>(threadIt.value());
  388. if(pThread != nullptr)
  389. {
  390. pThread->setDetectPeriod(it);
  391. SPDLOG_LOGGER_TRACE(m_logger, "更新对比项 {} 的检测时段", pThread->getThreadInfo().compareItemInfo.strName.toStdString());
  392. }
  393. }
  394. }
  395. /* 更新当前检测时段信息 */
  396. m_mapDetectPeriod = mapNewDetectConfig;
  397. }
  398. /* 检查获取出更新的对比项信息 */
  399. void ThreadCompareItemManager::checkDetectPeriodInfo(QMap<int, DetectPeriodConfig_t>& nowDetectInfo, QMap<int, DetectPeriodConfig_t> newDetectInfo, QMap<int, DetectPeriodConfig_t>& updateList)
  400. {
  401. for(const auto& it : newDetectInfo)
  402. {
  403. int compareItemID = it.nID;
  404. bool isFound = false;
  405. for(const auto& existingItem : nowDetectInfo)
  406. {
  407. if(existingItem.nID == compareItemID)
  408. {
  409. isFound = true;
  410. /* 已经存在的对比项,检查是否需要更新 */
  411. if(existingItem == it)
  412. {
  413. /* 对比项信息相同,不需要更新 */
  414. continue;
  415. } else
  416. {
  417. /* 对比项信息不同,需要更新 */
  418. updateList.insert(compareItemID, it);
  419. }
  420. return;
  421. }
  422. }
  423. if(isFound == false)
  424. {
  425. /* 新对比项,添加到更新列表 */
  426. updateList.insert(compareItemID, it);
  427. }
  428. }
  429. }
  430. /* 更新对比项信息到MQTT */
  431. void ThreadCompareItemManager::updateCompareItemInfoToMQTT()
  432. {
  433. QMap<int, CompareItemInfo_t> newMap;
  434. /* 获取当前的对比项信息 */
  435. for(auto it = m_mapThreads.begin(); it != m_mapThreads.end(); ++it)
  436. {
  437. BaseCalculateThread* pThread = it.value();
  438. if(pThread == nullptr)
  439. {
  440. continue;
  441. }
  442. /* 获取对比项信息 */
  443. CompareItemInfo_t itemInfo = pThread->getThreadInfo().compareItemInfo;
  444. newMap.insert(itemInfo.nID, itemInfo);
  445. }
  446. /* 和之前的对比,对比项是否有更新,有更新则发送 */
  447. bool isUpdated = false;
  448. if(newMap.size() != m_mapMQTTItemInfo.size())
  449. {
  450. isUpdated = true;
  451. } else
  452. {
  453. /* 进一步对比相信信息 */
  454. for(const auto& newItem : newMap)
  455. {
  456. auto nowIt = m_mapMQTTItemInfo.find(newItem.nID);
  457. if(nowIt == m_mapMQTTItemInfo.end())
  458. {
  459. isUpdated = true;
  460. break;
  461. }
  462. /* 比较对比项通道数量和对比项名称 */
  463. if(nowIt.value().strName != newItem.strName || nowIt.value().mapRoad.size() != newItem.mapRoad.size())
  464. {
  465. isUpdated = true;
  466. break;
  467. }
  468. /* 挨个比较对比项通道信息 */
  469. for(const auto& road : newItem.mapRoad)
  470. {
  471. auto roadIt = nowIt.value().mapRoad.find(road.nCompareRoadNum);
  472. if(roadIt == nowIt.value().mapRoad.end() || roadIt.value().strCompareRoadName != road.strCompareRoadName)
  473. {
  474. isUpdated = true;
  475. break;
  476. }
  477. }
  478. }
  479. }
  480. /* 服务启动,可能对比项是空的,上面检查结果为false,但是也需要发送一次清空对比项 */
  481. if(isUpdated || m_isOneSend.load())
  482. {
  483. sendCompareItemInfoToMQTT(newMap);
  484. m_mapMQTTItemInfo = newMap;
  485. }
  486. m_isOneSend.store(false);
  487. }
  488. /* 发送到MQTT */
  489. void ThreadCompareItemManager::sendCompareItemInfoToMQTT(const QMap<int, CompareItemInfo_t>& mapCompareItem)
  490. {
  491. /* 生成发送的json文件 */
  492. nJson jsonArray = nJson::array();
  493. for(const auto& it : mapCompareItem)
  494. {
  495. /* 判断这个对比项是否启用,不启用就跳过 */
  496. if(it.isEnable == false)
  497. {
  498. continue;
  499. }
  500. nJson jsonItem;
  501. /* 对比项ID */
  502. jsonItem["compareItem_id"] = it.nID;
  503. /* 对比项名称 */
  504. jsonItem["compareItem_name"] = it.strName.toStdString();
  505. nJson josnItemRoads = nJson::array();
  506. for(const auto& road : it.mapRoad)
  507. {
  508. nJson jsonRoad;
  509. /* 对比项通道编号和名称 */
  510. jsonRoad["road_num"] = road.nCompareRoadNum;
  511. jsonRoad["road_name"] = road.strCompareRoadName.toStdString();
  512. /* 通道使用的声卡编号 */
  513. // jsonRoad["sound_card_num"] = road.scRoadInfo.nSoundCardNum;
  514. // jsonRoad["sound_card_road_num"] = road.scRoadInfo.roadInfo.nRoadNum;
  515. josnItemRoads.push_back(jsonRoad);
  516. }
  517. jsonItem["compareItem_roads"] = josnItemRoads;
  518. // /* 静音条件 */
  519. // jsonItem["silence_switch"] = it.paramMute.isEnable;
  520. // jsonItem["silence_threshold"] = it.paramMute.threshold.nThreshold;
  521. // jsonItem["silence_duration"] = it.paramMute.nLen;
  522. // jsonItem["silence_sensitivity"] = it.paramMute.nSensitivity;
  523. // /* 过载条件 */
  524. // jsonItem["overload_switch"] = it.paramOverload.isEnable;
  525. // jsonItem["overload_threshold"] = it.paramOverload.threshold.nThreshold;
  526. // jsonItem["overload_duration"] = it.paramOverload.nLen;
  527. // jsonItem["overload_sensitivity"] = it.paramOverload.nSensitivity;
  528. // /* 反相条件 */
  529. // jsonItem["reverse_switch"] = it.paramPhase.isEnable;
  530. // jsonItem["reverse_threshold"] = it.paramPhase.threshold.dThreshold;
  531. // jsonItem["reverse_duration"] = it.paramPhase.nLen;
  532. // jsonItem["reverse_sensitivity"] = it.paramPhase.nSensitivity;
  533. jsonArray.push_back(jsonItem);
  534. }
  535. /* 发送到MQTT */
  536. if(m_pFromMQTT->connectState() == QMQTT::ConnectionState::STATE_CONNECTED)
  537. {
  538. QByteArray jsonData = QByteArray::fromStdString(jsonArray.dump());
  539. if(!m_pFromMQTT->sendMessage(m_pubTopic, jsonData, 0, true))
  540. {
  541. SPDLOG_LOGGER_WARN(m_logger, "ThreadCompareItemManager: 发送对比项信息到MQTT失败");
  542. }else
  543. {
  544. SPDLOG_LOGGER_TRACE(m_logger, "ThreadCompareItemManager: 发送对比项信息到MQTT成功");
  545. }
  546. } else
  547. {
  548. SPDLOG_LOGGER_WARN(m_logger, "m_pFromMQTT 未连接到服务器,无法发送");
  549. }
  550. }
  551. /* 初始化MQTT */
  552. void ThreadCompareItemManager::initMQTT()
  553. {
  554. if(m_pFromMQTT == nullptr)
  555. {
  556. m_pFromMQTT = new FromMQTT();
  557. if(m_pFromMQTT == nullptr)
  558. {
  559. SPDLOG_LOGGER_ERROR(m_logger, "创建MQTT对象失败");
  560. return;
  561. }
  562. }
  563. /* 登陆MQTT */
  564. m_pFromMQTT->setIPAndPort(GInfo.mqttIP(), GInfo.mqttPort());
  565. // m_pFromMQTT->addSubcribe("LH_WEBINFO");
  566. m_pFromMQTT->setAutoReconnect(true);
  567. // m_pFromMQTT->connectToServer();
  568. // connect(m_pFromMQTT, &FromMQTT::signal_recvMessage, [this](const QMQTT::Message& message) {
  569. // SPDLOG_LOGGER_WARN(m_logger, "--------------------- 接收到MQTT消息: {}", message.topic().toStdString());
  570. // SPDLOG_LOGGER_WARN(m_logger, "消息内容: {}", message.payload().toStdString());
  571. // });
  572. SPDLOG_LOGGER_INFO(m_logger, "☆ 连接MQTT服务器: {}:{}, 对比项信息订阅主题: {}", GInfo.mqttIP().toStdString(), GInfo.mqttPort(), m_pubTopic.toStdString());
  573. }