MQTTBase.cpp 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. #include "MQTTBase.h"
  2. #include <QDebug>
  3. #include "qmqtt_message.h"
  4. #include "spdlog/spdlog.h"
  5. MQTTBase::MQTTBase(QObject* parent) : QObject(parent)
  6. {
  7. /* 获取日志 */
  8. // m_logger = spdlog::get("MQTT");
  9. // if(m_logger == nullptr)
  10. // {
  11. // SPDLOG_ERROR("获取MQTT logger 失败!");
  12. // exit(-1);
  13. // }
  14. // m_client.setKeepAlive(2); // 设置心跳时间为5秒
  15. m_reconnectTimer.setSingleShot(false);
  16. /* 连接信号和槽 */
  17. connect(&m_client,SIGNAL(connected()),this,SLOT(do_connected()));
  18. connect(&m_client,SIGNAL(disconnected()),this,SLOT(do_disconnect()));
  19. connect(&m_client,SIGNAL(error(QMQTT::ClientError)),this,SLOT(do_error(QMQTT::ClientError)));
  20. connect(&m_client,SIGNAL(subscribed(QString,quint8)),this,SLOT(do_subscribed(QString, quint8)));
  21. connect(&m_client,SIGNAL(received(QMQTT::Message)),this,SLOT(do_received(QMQTT::Message)));
  22. connect(&m_client, &QMQTT::Client::received, this, &MQTTBase::signal_recvMessage);
  23. connect(&m_reconnectTimer, &QTimer::timeout, this, &MQTTBase::do_timeoutReconnect);
  24. }
  25. /* 设置地址 */
  26. void MQTTBase::setIPAndPort(const QString& IP, int port)
  27. {
  28. m_mqttIP = IP;
  29. m_mqttPort = port;
  30. QHostAddress addr = QHostAddress(IP);
  31. m_client.setHost(addr);
  32. m_client.setPort(port);
  33. }
  34. /* 设置订阅 */
  35. void MQTTBase::addSubcribe(const QString& topic, int qos)
  36. {
  37. if(qos > 2)
  38. {
  39. SPDLOG_ERROR("QoS值不合法:{}", qos);
  40. return;
  41. }
  42. if(m_client.connectionState() == QMQTT::ConnectionState::STATE_CONNECTED)
  43. {
  44. /* 已连接到MQTT,之前添加的主题已经订阅,现在添加的也直接添加订阅 */
  45. m_client.subscribe(topic, qos);
  46. m_mapTopic.insert(topic, qos);
  47. }else
  48. {
  49. /* 还未连接到MQTT,直接加入到订阅列表 */
  50. m_mapTopic.insert(topic, qos);
  51. }
  52. }
  53. /* 设置自动重连 */
  54. void MQTTBase::setAutoReconnect(bool isAuto)
  55. {
  56. m_isAutoReconnect = isAuto;
  57. m_client.setAutoReconnect(isAuto);
  58. }
  59. /* 连接到服务器 */
  60. void MQTTBase::connectToServer()
  61. {
  62. if(m_client.connectionState() == QMQTT::ConnectionState::STATE_CONNECTED)
  63. {
  64. SPDLOG_INFO("MQTT已经连接到服务器,无需重复连接");
  65. return;
  66. }
  67. else if(m_client.connectionState() == QMQTT::ConnectionState::STATE_CONNECTING)
  68. {
  69. SPDLOG_INFO("MQTT正在连接到服务器,请稍后");
  70. return;
  71. }
  72. m_client.connectToHost();
  73. }
  74. /* 获取连接状态 */
  75. QMQTT::ConnectionState MQTTBase::connectState()
  76. {
  77. return m_client.connectionState();
  78. }
  79. /* 发送消息 */
  80. bool MQTTBase::sendMessage(const QString& topic, const QByteArray& message, int qos)
  81. {
  82. if(m_client.connectionState() != QMQTT::ConnectionState::STATE_CONNECTED)
  83. {
  84. SPDLOG_ERROR("MQTT未连接到服务器,发送消息失败");
  85. return false;
  86. }
  87. QMQTT::Message msg(0, topic, message, qos);
  88. auto ret = m_client.publish(msg);
  89. if(ret != 0)
  90. {
  91. SPDLOG_ERROR("发送消息失败:{}, 错误代码:{}", topic.toStdString(), ret);
  92. return false;
  93. }
  94. return true;
  95. }
  96. /* 发送消息,设置消息保留 */
  97. bool MQTTBase::sendMessage(const QString& topic, const QByteArray& message, int qos, bool retain)
  98. {
  99. if(m_client.connectionState() != QMQTT::ConnectionState::STATE_CONNECTED)
  100. {
  101. SPDLOG_ERROR("MQTT未连接到服务器,发送消息失败");
  102. return false;
  103. }
  104. QMQTT::Message msg;
  105. msg.setTopic(topic);
  106. msg.setPayload(message);
  107. msg.setQos(qos);
  108. msg.setRetain(retain); // 设置消息保留
  109. auto ret = m_client.publish(msg);
  110. if(ret != 0)
  111. {
  112. SPDLOG_ERROR("发送消息失败:{}, 错误代码:{}", topic.toStdString(), ret);
  113. return false;
  114. }
  115. return true;
  116. }
  117. /* 连接成功 */
  118. void MQTTBase::do_connected()
  119. {
  120. SPDLOG_INFO("MQTT IP:{} ,Port:{} 连接成功!", m_mqttIP.toStdString(), m_mqttPort);
  121. if(m_reconnectTimer.isActive())
  122. {
  123. m_reconnectTimer.stop();
  124. }
  125. /* 订阅所有的主题 */
  126. for(auto& it : m_mapTopic.keys())
  127. {
  128. auto qos = m_mapTopic.value(it);
  129. m_client.subscribe(it, qos);
  130. }
  131. }
  132. /* 断开连接 */
  133. void MQTTBase::do_disconnect()
  134. {
  135. SPDLOG_INFO("MQTT 断开连接!");
  136. if(m_isAutoReconnect)
  137. {
  138. SPDLOG_INFO("MQTT 自动重连已开启,正在尝试重新连接...");
  139. if(m_reconnectTimer.isActive())
  140. {
  141. m_reconnectTimer.stop();
  142. }
  143. m_reconnectTimer.start(6000);
  144. connectToServer();
  145. }
  146. emit signal_disconnected();
  147. // m_isConnected = false;
  148. }
  149. /* 错误 */
  150. void MQTTBase::do_error(const QMQTT::ClientError error)
  151. {
  152. SPDLOG_ERROR("MQTT 错误:{}", (int)error);
  153. // if(m_isAutoReconnect)
  154. // {
  155. // SPDLOG_INFO("MQTT 自动重连已开启,正在尝试重新连接...");
  156. // m_client.connectToHost();
  157. // }
  158. }
  159. /* 订阅成功 */
  160. void MQTTBase::do_subscribed(const QString& topic, const quint8 qos)
  161. {
  162. SPDLOG_INFO("MQTT 订阅:{}, QoS:{} 成功", topic.toStdString(), qos);
  163. }
  164. /* 接收到消息 */
  165. void MQTTBase::do_received(const QMQTT::Message& message)
  166. {
  167. // SPDLOG_INFO("MQTTBase接收到一条消息:{}", message.topic().toStdString());
  168. recvMessage(message);
  169. }
  170. /* 超时重连 */
  171. void MQTTBase::do_timeoutReconnect()
  172. {
  173. SPDLOG_INFO("MQTT 超时重连...");
  174. connectToServer();
  175. }