fromMQTT.cpp 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. #include "fromMQTT.h"
  2. #include <QDebug>
  3. #include <QJsonDocument>
  4. #include <QJsonObject>
  5. #include <QJsonParseError>
  6. #include "qmqtt_message.h"
  7. FromMQTT::FromMQTT(QObject* parent) : QObject(parent)
  8. {
  9. /* 获取日志 */
  10. m_logger = spdlog::get("MQTT");
  11. if(m_logger == nullptr)
  12. {
  13. qDebug() << "获取MQTT logger 失败!";
  14. exit(-1);
  15. }
  16. /* 连接信号和槽 */
  17. connect(&m_client,SIGNAL(connected()),this,SLOT(do_connected()));
  18. connect(&m_client,SIGNAL(disconnected()),this,SLOT(do_disconnect()));
  19. connect(&m_client,SIGNAL(error(QMQTT::ClientError)),this,SLOT(do_error(QMQTT::ClientError)));
  20. connect(&m_client,SIGNAL(received(QMQTT::Message)),this,SLOT(do_received(QMQTT::Message)));
  21. connect(&m_client,SIGNAL(subscribed(QString,quint8)),this,SLOT(do_subscribed(QString, quint8)));
  22. }
  23. /* 设置地址 */
  24. void FromMQTT::setHostName(const QString& hostName)
  25. {
  26. QHostAddress addr = QHostAddress(hostName);
  27. m_client.setHost(addr);
  28. }
  29. /* 设置订阅 */
  30. void FromMQTT::setSubcribe(const QString& topic, int qos)
  31. {
  32. m_client.subscribe(topic,qos);
  33. }
  34. /* 连接到服务器 */
  35. void FromMQTT::connectToServer()
  36. {
  37. m_client.connectToHost();
  38. }
  39. /* 解析所有消息 */
  40. void FromMQTT::analyzeAllMessage()
  41. {
  42. while (!m_queueMessage.isEmpty())
  43. {
  44. analyzeOneMessage();
  45. }
  46. }
  47. /* 解析消息数据 */
  48. void FromMQTT::analyzeOneMessage()
  49. {
  50. /* 先解锁,目前没有锁 */
  51. if(m_queueMessage.isEmpty())
  52. {
  53. return;
  54. }
  55. SPDLOG_LOGGER_INFO(m_logger,"message:{}",m_queueMessage.front().toStdString());
  56. /* 出队 */
  57. m_queueMessage.dequeue();
  58. }
  59. /* 连接成功 */
  60. void FromMQTT::do_connected()
  61. {
  62. SPDLOG_LOGGER_INFO(m_logger,"连接成功!");
  63. m_client.subscribe("test/one");
  64. }
  65. /* 断开连接 */
  66. void FromMQTT::do_disconnect()
  67. {
  68. SPDLOG_LOGGER_INFO(m_logger,"断开连接!");
  69. }
  70. /* 错误 */
  71. void FromMQTT::do_error(const QMQTT::ClientError error)
  72. {
  73. SPDLOG_LOGGER_ERROR(m_logger,"错误:{}",(int)error);
  74. }
  75. /* 订阅成功 */
  76. void FromMQTT::do_subscribed(const QString& topic, const quint8 qos)
  77. {
  78. SPDLOG_LOGGER_INFO(m_logger,"订阅:{},QoS:{} 成功",topic.toStdString(),qos);
  79. }
  80. /* 接收到消息 */
  81. void FromMQTT::do_received(const QMQTT::Message& message)
  82. {
  83. // SPDLOG_LOGGER_INFO(m_logger,"接收到一条消息:{}",message.topic().toStdString());
  84. /* 接收到消息,加入列表中 */
  85. QByteArray ba = message.payload();
  86. /* 这里可以添加锁,不过目前是单线程,不需要添加 */
  87. m_queueMessage.enqueue(ba);
  88. analyzeOneMessage();
  89. }