123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- #include "fromMQTT.h"
- #include <QDebug>
- #include <QJsonDocument>
- #include <QJsonObject>
- #include <QJsonParseError>
- #include "qmqtt_message.h"
- FromMQTT::FromMQTT(QObject* parent) : QObject(parent)
- {
- /* 获取日志 */
- m_logger = spdlog::get("MQTT");
- if(m_logger == nullptr)
- {
- qDebug() << "获取MQTT logger 失败!";
- exit(-1);
- }
- /* 连接信号和槽 */
- connect(&m_client,SIGNAL(connected()),this,SLOT(do_connected()));
- connect(&m_client,SIGNAL(disconnected()),this,SLOT(do_disconnect()));
- connect(&m_client,SIGNAL(error(QMQTT::ClientError)),this,SLOT(do_error(QMQTT::ClientError)));
- connect(&m_client,SIGNAL(received(QMQTT::Message)),this,SLOT(do_received(QMQTT::Message)));
- connect(&m_client,SIGNAL(subscribed(QString,quint8)),this,SLOT(do_subscribed(QString, quint8)));
-
- }
- /* 设置地址 */
- void FromMQTT::setHostName(const QString& hostName)
- {
- QHostAddress addr = QHostAddress(hostName);
- m_client.setHost(addr);
- }
- /* 设置订阅 */
- void FromMQTT::setSubcribe(const QString& topic, int qos)
- {
- m_client.subscribe(topic,qos);
- }
- /* 连接到服务器 */
- void FromMQTT::connectToServer()
- {
- m_client.connectToHost();
- }
- /* 解析所有消息 */
- void FromMQTT::analyzeAllMessage()
- {
- while (!m_queueMessage.isEmpty())
- {
- analyzeOneMessage();
- }
- }
- /* 解析消息数据 */
- void FromMQTT::analyzeOneMessage()
- {
- /* 先解锁,目前没有锁 */
- if(m_queueMessage.isEmpty())
- {
- return;
- }
- SPDLOG_LOGGER_INFO(m_logger,"message:{}",m_queueMessage.front().toStdString());
- /* 出队 */
- m_queueMessage.dequeue();
- }
- /* 连接成功 */
- void FromMQTT::do_connected()
- {
- SPDLOG_LOGGER_INFO(m_logger,"连接成功!");
- m_client.subscribe("test/one");
- }
- /* 断开连接 */
- void FromMQTT::do_disconnect()
- {
- SPDLOG_LOGGER_INFO(m_logger,"断开连接!");
- }
- /* 错误 */
- void FromMQTT::do_error(const QMQTT::ClientError error)
- {
- SPDLOG_LOGGER_ERROR(m_logger,"错误:{}",(int)error);
- }
- /* 订阅成功 */
- void FromMQTT::do_subscribed(const QString& topic, const quint8 qos)
- {
- SPDLOG_LOGGER_INFO(m_logger,"订阅:{},QoS:{} 成功",topic.toStdString(),qos);
- }
- /* 接收到消息 */
- void FromMQTT::do_received(const QMQTT::Message& message)
- {
- // SPDLOG_LOGGER_INFO(m_logger,"接收到一条消息:{}",message.topic().toStdString());
- /* 接收到消息,加入列表中 */
- QByteArray ba = message.payload();
- /* 这里可以添加锁,不过目前是单线程,不需要添加 */
- m_queueMessage.enqueue(ba);
- analyzeOneMessage();
- }
|