FromMQTT.cpp 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. #include "spdlog/spdlog.h"
  2. #include "FromMQTT.h"
  3. #include "commonDefine.h"
  4. #include <QApplication>
  5. #include <QSettings>
  6. #include <QTimer>
  7. #include <QEventLoop>
  8. FromMQTT::FromMQTT(QObject* parent)
  9. {
  10. }
  11. FromMQTT::~FromMQTT()
  12. {
  13. }
  14. /* 传入MQTT地址和端口 */
  15. void FromMQTT::setAddrInfo(const QString& ip, int port)
  16. {
  17. m_mqttIP = ip;
  18. m_mqttPort = port;
  19. m_isReadConfig.store(false);
  20. }
  21. /* 初始化MQTT */
  22. bool FromMQTT::initMQTT()
  23. {
  24. if(m_isReadConfig.load())
  25. {
  26. readConfig();
  27. }
  28. /* 连接MQTT */
  29. setIPAndPort(m_mqttIP, m_mqttPort);
  30. addSubcribe(m_topic_WebAPI, 0); // 订阅WebAPI主题
  31. addSubcribe(m_topic_UpdatePlan); // 订阅更新计划主题
  32. /* 设置自动重连 */
  33. setAutoReconnect(true);
  34. if(!m_client.isConnectedToHost())
  35. {
  36. connectToServer();
  37. }
  38. /* 等待mqtt回复webapi信息(不阻塞UI,收到立即退出)*/
  39. waitForWebInfo(m_timeOut);
  40. SPDLOG_INFO("WebAPI URL: {}", m_webAPIUrl.toStdString());
  41. SPDLOG_INFO("WebAPI DBID: {}", m_webAPIID.toStdString());
  42. if(m_webAPIUrl.isEmpty())
  43. {
  44. SPDLOG_ERROR("接收MQTT消息超时,获取WebAPI信息失败");
  45. return false;
  46. }
  47. /* 开启MQTT订阅 */
  48. // addSubcribe(m_topic_UpdatePlan);
  49. return true;
  50. }
  51. /* 读取配置文件 */
  52. void FromMQTT::readConfig()
  53. {
  54. QString strConfigPath = QApplication::applicationDirPath() + m_configName;
  55. QSettings setting(strConfigPath, QSettings::IniFormat);
  56. setting.beginGroup("MQTTINFO");
  57. m_mqttIP = setting.value("MQTTAddr", "").toString();
  58. m_mqttPort = setting.value("MQTTPort", 1883).toInt();
  59. m_timeOut = setting.value("MQTTTimeOut", 20000).toInt();
  60. setting.endGroup();
  61. }
  62. /* 接收到消息,子类继承这个解析消息数据 */
  63. void FromMQTT::recvMessage(const QMQTT::Message& message)
  64. {
  65. SPDLOG_TRACE("接收到MQTT消息: {}", message.payload().toStdString());
  66. QString payload = QString::fromUtf8(message.payload());
  67. if(message.topic() == m_topic_WebAPI)
  68. {
  69. /* 处理WebAPI消息 */
  70. try
  71. {
  72. nJson jsonMsg = nJson::parse(payload.toStdString());
  73. m_webAPIUrl = jsonMsg["webaddr"].is_null() ? "" : jsonMsg["webaddr"].get<std::string>().c_str();
  74. m_webAPIID = jsonMsg["serverid"].is_null() ? "" : jsonMsg["serverid"].get<std::string>().c_str();
  75. m_isGetWebInfoSuccess.store(true);
  76. emit signal_webInfoReady(); // 通知等待逻辑退出
  77. }nJsonCatchNoReturn
  78. SPDLOG_TRACE("WebAPI信息获取成功: URL = {}, ID = {}", m_webAPIUrl.toStdString(), m_webAPIID.toStdString());
  79. }
  80. else if(message.topic() == m_topic_UpdatePlan)
  81. {
  82. /* 处理管理站消息 */
  83. try
  84. {
  85. nJson jsonMsg = nJson::parse(payload.toStdString());
  86. SPDLOG_INFO("MQTT接收到刷新报警时段消息");
  87. emit signal_updatePlan();
  88. }nJsonCatchNoReturn
  89. }
  90. }
  91. /* 等待WebAPI信息到达,使用事件循环+信号的方式,不阻塞主UI刷新 */
  92. bool FromMQTT::waitForWebInfo(int timeoutMs)
  93. {
  94. if(m_isGetWebInfoSuccess.load())
  95. return true; // 已经有了
  96. QEventLoop loop;
  97. QTimer timer;
  98. timer.setSingleShot(true);
  99. if(timeoutMs > 0)
  100. {
  101. timer.start(timeoutMs);
  102. QObject::connect(&timer, &QTimer::timeout, &loop, &QEventLoop::quit);
  103. }
  104. // 收到Web信息立即退出
  105. QMetaObject::Connection c = QObject::connect(this, &FromMQTT::signal_webInfoReady, &loop, &QEventLoop::quit);
  106. loop.exec(); // 期间UI主事件循环仍可运行
  107. QObject::disconnect(c);
  108. return m_isGetWebInfoSuccess.load();
  109. }