#include "ThreadCompareItemManager.h" #include "GlobalInfo.h" #include "SystemConfig.h" #include "CompareItemThread.h" #include "ThreadPool.h" #include "FromMQTT.h" #include "commonDefine.h" /* 给对比项套一层壳,这个函数就是新的线程,在里面new出新的对比项实例,防止Qt报线程归属权错误 在函数中将对比项实例插入到线程管理器中 */ void ThreadCompareItemManager::thread_compareItem(CalculateThreadInfo_t threadInfo) { auto pThread = new CompareItemThread(threadInfo); if(pThread == nullptr) { SPDLOG_ERROR("ThreadCompareItemManager: 创建对比项线程失败"); return; } CompareItemManager.addCompareItemThread(pThread); /* 启动线程,就会一直阻塞在这里了 */ pThread->threadTask(); } ThreadCompareItemManager::ThreadCompareItemManager() { } ThreadCompareItemManager::~ThreadCompareItemManager() { } /* 线程函数 */ void ThreadCompareItemManager::thread_CompareItemManager() { m_logger = spdlog::get("CompareItemManager"); if(m_logger == nullptr) { fmt::print("ThreadCompareItemManager: CompareItemManager Logger not found.\n"); return; } /* 创建定时器和事件循环 */ m_pEventLoop = new QEventLoop(); m_pTimer = new QTimer(); // std::function task = std::bind(&ThreadCompareItemManager::do_task, this); // m_pTimer->setInterval(10000); m_pTimer->setTimerType(Qt::PreciseTimer); m_pTimer->setSingleShot(false); // 设置为非单次定时器 connect(m_pTimer, &QTimer::timeout, this, &ThreadCompareItemManager::do_task, Qt::DirectConnection); /* 初始化webapi */ m_webAPIUrl = GInfo.webAPIUrl(); m_webAPIID = GInfo.webAPIID(); m_webAPIAppType = GInfo.webApiAppType(); if(!m_fromWebAPI.initWebApi(m_webAPIUrl, m_webAPIID, m_webAPIAppType)) { SPDLOG_LOGGER_ERROR(m_logger, "ThreadCompareItemManager: 初始化WebAPI失败"); return; } /* 获取MQTT发布订阅 */ m_pubTopic = GInfo.mqttPubTopicCompareItem(); /* 获取基础配置,目前只获取一次 */ updateBaseSettings(); SPDLOG_LOGGER_INFO(m_logger, "开启对比项管理线程"); m_pTimer->start(1000); m_pEventLoop->exec(); SPDLOG_LOGGER_INFO(m_logger, "ThreadCompareItemManager: 线程结束"); } /* 添加对比项实例 */ void ThreadCompareItemManager::addCompareItemThread(CompareItemThread* pThread) { if(pThread == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "添加对比项线程失败,线程指针为空"); return; } std::lock_guard lock(m_mutexCompareItemThreads); int compareItemID = pThread->getThreadInfo().compareItemInfo.nID; if(m_mapThreads.contains(compareItemID)) { SPDLOG_LOGGER_WARN(m_logger, "对比项线程已存在,ID: {}", compareItemID); return; // 对比项线程已存在 } m_mapThreads.insert(compareItemID, pThread); SPDLOG_LOGGER_INFO(m_logger, "添加对比项线程成功,ID: {}", compareItemID); } /* 通过对比项ID和通道ID获取声卡通道信息 */ SoundCardRoadInfo_t ThreadCompareItemManager::getSoundCardRoadInfo(int compareItemID, int roadNum) { std::lock_guard lock(m_mutexCompareItemThreads); auto it = m_mapThreads.find(compareItemID); if(it == m_mapThreads.end()) { SPDLOG_LOGGER_WARN(m_logger, "对比项线程不存在,ID: {}", compareItemID); return SoundCardRoadInfo_t(); } auto compareInfo = it.value()->getThreadInfo().compareItemInfo; SoundCardRoadInfo_t roadInfo; for(const auto& road : compareInfo.mapRoad) { if(road.nCompareRoadNum == roadNum) { roadInfo = road.scRoadInfo; break; } } return roadInfo; } /* 任务函数 */ void ThreadCompareItemManager::do_task() { // SPDLOG_LOGGER_ERROR(m_logger, "ThreadCompareItemManager: do_task() 函数被调用"); /* 如果定时间隔小于10秒,则设置成10秒,一开始小是为了线程开启后立马执行一次 */ if(m_pTimer->interval() < 10000) { // m_pTimer->stop(); m_pTimer->setInterval(10000); // m_pTimer->start(); } if(m_pFromMQTT == nullptr) { /* 初始化MQTT */ initMQTT(); } // SPDLOG_LOGGER_WARN(m_logger, "定时器是否在运行: {}, 触发间隔: {} ms", m_pTimer->isActive(), m_pTimer->interval()); /* ------------------------------------------------------------------ * 处理对比项信息 * ------------------------------------------------------------------ */ processCompareItemInfo(); /* ------------------------------------------------------------------ * 更新检测时段 * ------------------------------------------------------------------ */ processDetectPeriodInfo(); /* ------------------------------------------------------------------ * 更新对比项信息到MQTT * ------------------------------------------------------------------ */ updateCompareItemInfoToMQTT(); // SPDLOG_LOGGER_WARN(m_logger, "ThreadCompareItemManager: do_task() 函数执行完毕"); } /* 更新基础设置信息,如数据库设置,噪音参数等 */ bool ThreadCompareItemManager::updateBaseSettings() { /* 更新基础数据 */ QMap baseSettings; if(!m_fromWebAPI.getSystemConfig(baseSettings)) { SPDLOG_LOGGER_ERROR(m_logger, "获取系统配置失败"); return false; } /* 将获取到的配置转换成结构体 */ SysConfig.parseConfigFromDatabase(baseSettings); /* 检测时段单独获取 */ QMap mapDetectConfig; if(!m_fromWebAPI.getDetectPeriodConfig(mapDetectConfig)) { SPDLOG_ERROR("获取对比项检测时段配置失败"); return false; } SysConfig.setDetectPeriodConfig(mapDetectConfig); return true; } /* 对比项信息处理函数 */ void ThreadCompareItemManager::processCompareItemInfo() { /* 获取对比项信息 */ QList listNewItems; if(!m_fromWebAPI.getCompareItemInfo(listNewItems)) { SPDLOG_LOGGER_DEBUG(m_logger, "ThreadCompareItemManager: 获取对比项失败"); return; } checkCompareItemInfo(listNewItems, m_listCreateItems, m_listUpdateItems, m_listDeleteItems); SPDLOG_LOGGER_DEBUG(m_logger, "要退出的对比项个数: {}, 要更新的对比项个数: {}, 要创建的对比项个数: {}", m_listDeleteItems.size(), m_listUpdateItems.size(), m_listCreateItems.size()); /* 先删除已消失的对比项信息 */ processDeleteCompareItemThreads(m_listDeleteItems); /* 更新需要更新的线程 */ updateRunningThreads(m_listUpdateItems); /* 再创建新的对比项线程 */ createNewCompareItemThreads(m_listCreateItems); } /** * @brief 处理对比项信息,新获取的和已有的对比,会在这里更新 m_mapNowCompareItem 内容 * * @param createList 创建列表 * @param updateList 更新列表,根据对比项ID进行更新信息 * @param deleteList 删除列表 */ void ThreadCompareItemManager::checkCompareItemInfo(QList& newList, QList& createList, QList& updateList, QList& deleteList) { createList.clear(); updateList.clear(); deleteList.clear(); m_mapNowCompareItem.clear(); // QMap mapNowItems; /* 先从对比项线程中获取对比项信息 */ for(auto it = m_mapThreads.begin(); it != m_mapThreads.end(); ++it) { BaseCalculateThread* pThread = it.value(); if(pThread == nullptr) { continue; } /* 获取对比项信息 */ CompareItemInfo_t itemInfo = pThread->getThreadInfo().compareItemInfo; m_mapNowCompareItem.insert(itemInfo.nID, itemInfo); } /* 遍历新获取的对比项信息,找出需要新增的对比项和需要更新的对比项 */ for(const CompareItemInfo_t& item : newList) { if(!m_mapNowCompareItem.contains(item.nID)) { /* 新对比项,添加到创建列表 */ createList.append(item); } else { /* 已有对比项,检查是否需要更新 */ const CompareItemInfo_t& existingItem = m_mapNowCompareItem.value(item.nID); /* 先对比基础信息 */ if(!existingItem.isEqualBase(item)) { /* 基础信息不同,需要更新 */ updateList.append(item); continue; } /* 在对比对比项通道信息 */ if(!existingItem.isEqualRoads(item)) { /* 通道信息不同,需要更新 */ updateList.append(item); continue; } } } /* 遍历当前对比项信息,找出需要删除的对比项 */ for(auto it : m_mapNowCompareItem) { bool isFound = false; for(const CompareItemInfo_t& newItem : newList) { if(it.nID == newItem.nID) { isFound = true; break; // 找到对应的对比项,不需要删除 } } if(!isFound) { /* 当前对比项不在新获取的对比项中,说明需要删除 */ deleteList.append(it.nID); } } } /** * @brief 处理需要删除的对比项线程 * 1、先处理已经停止的线程 * 2、再将这次列表中的对比项ID对应的线程设置为停止状态,待到下次循环再删除已经停止完成的线程 * * @param deleteList */ void ThreadCompareItemManager::processDeleteCompareItemThreads(const QList& deleteList) { /* 先处理已经停止运行的线程 */ for(auto it = m_mapThreads.begin(); it != m_mapThreads.end();) { BaseCalculateThread* pThread = it.value(); if(pThread == nullptr) { SPDLOG_LOGGER_WARN(m_logger, "对比项线程指针为空,即将删除该线程指针"); it = m_mapThreads.erase(it); continue; } if(pThread->getThreadInfo().threadState == EThreadState::State_Stopped) { /* 线程已经停止,直接删除 */ SPDLOG_LOGGER_INFO(m_logger, "对比项线程 {} 已经停止,准备删除", pThread->getThreadInfo().compareItemInfo.strName.toStdString()); delete pThread; it = m_mapThreads.erase(it); continue; } ++it; } /* 停止本次需要停止的线程 */ for(auto it : m_mapThreads) { int compareItemID = it->getThreadInfo().compareItemInfo.nID; if(deleteList.contains(compareItemID)) { /* 设置线程停止标志 */ it->stopThread(); SPDLOG_LOGGER_INFO(m_logger, "对比项线程 {} 设置为停止状态", it->getThreadInfo().compareItemInfo.strName.toStdString()); } } } /* 更新正在运行的线程信息 */ void ThreadCompareItemManager::updateRunningThreads(const QList& updateList) { if(updateList.isEmpty()) { return; } for(const CompareItemInfo_t& item : updateList) { auto it = m_mapThreads.find(item.nID); if(it == m_mapThreads.end()) { SPDLOG_LOGGER_WARN(m_logger, "对比项线程 {} 不存在,无法更新信息", item.strName.toStdString()); continue; } BaseCalculateThread* pThread = it.value(); if(pThread == nullptr) { continue; } CalculateThreadInfo_t threadInfo; threadInfo.compareItemInfo = item; pThread->updateThreadInfo(threadInfo); } } /* 创建新的线程 */ bool ThreadCompareItemManager::createNewCompareItemThreads(const QList& createList) { if(createList.isEmpty()) { SPDLOG_LOGGER_DEBUG(m_logger, "没有新的对比项需要创建"); return true; } for(auto& it : createList) { /* 创建新的对比项线程 */ CalculateThreadInfo_t threadInfo; threadInfo.compareItemInfo = it; threadInfo.threadType = EThreadType::Type_CompareItem; threadInfo.threadState = EThreadState::State_Inited; CPPTP.add_task(&ThreadCompareItemManager::thread_compareItem, threadInfo); /* 创建线程对象 */ // CompareItemThread* pThread = new CompareItemThread(threadInfo); // if(pThread == nullptr) // { // SPDLOG_LOGGER_ERROR(m_logger, "创建对比项线程 {} 失败", it.strName.toStdString()); // return false; // } /* 启动线程 */ // CPPTP.add_task(&CompareItemThread::threadTask, pThread); /* 添加到线程列表中 */ // m_mapThreads.insert(it.nID, pThread); } return true; } /* 处理检测时段信息 */ void ThreadCompareItemManager::processDetectPeriodInfo() { /* 获取计划信息 */ QMap mapNewDetectConfig; if(!m_fromWebAPI.getDetectPeriodConfig(mapNewDetectConfig)) { SPDLOG_LOGGER_ERROR(m_logger, "获取检测时段配置失败"); return; } QMap mapUpdateDetectConfig; checkDetectPeriodInfo(mapNewDetectConfig, mapUpdateDetectConfig); /* 更新检测时段 */ for(const auto& it : mapUpdateDetectConfig) { auto threadIt = m_mapThreads.find(it.nID); if(threadIt != m_mapThreads.end()) { /* 找到对应的对比项线程,更新检测时段 */ CompareItemThread* pThread = dynamic_cast(threadIt.value()); if(pThread != nullptr) { pThread->setDetectPeriod(it); SPDLOG_LOGGER_TRACE(m_logger, "更新对比项 {} 的检测时段", pThread->getThreadInfo().compareItemInfo.strName.toStdString()); } } } } /* 检查获取出更新的对比项信息 */ void ThreadCompareItemManager::checkDetectPeriodInfo(QMap newDetectInfo, QMap& updateList) { for(const auto& it : newDetectInfo) { int compareItemID = it.nID; for(const auto& existingItem : m_mapDetectPeriod) { if(existingItem.nID == compareItemID) { /* 已经存在的对比项,检查是否需要更新 */ if(existingItem == it) { /* 对比项信息相同,不需要更新 */ continue; } else { /* 对比项信息不同,需要更新 */ updateList.insert(compareItemID, it); } return; } } } } /* 更新对比项信息到MQTT */ void ThreadCompareItemManager::updateCompareItemInfoToMQTT() { /* 生成发送的json文件 */ nJson jsonArray = nJson::array(); for(const auto& it : m_mapNowCompareItem) { nJson jsonItem; /* 对比项ID */ jsonItem["compareItem_id"] = it.nID; /* 对比项名称 */ jsonItem["compareItem_name"] = it.strName.toStdString(); for(const auto& road : it.mapRoad) { nJson jsonRoad; /* 对比项通道编号和名称 */ jsonRoad["road_num"] = road.nCompareRoadNum; jsonRoad["road_name"] = road.strCompareRoadName.toStdString(); /* 通道使用的声卡编号 */ jsonRoad["sound_card_num"] = road.scRoadInfo.nSoundCardNum; jsonRoad["sound_card_road_num"] = road.scRoadInfo.roadInfo.nRoadNum; jsonRoad["compareItem_roads"].push_back(jsonRoad); } /* 静音条件 */ jsonItem["silence_switch"] = it.paramMute.isEnable; jsonItem["silence_threshold"] = it.paramMute.threshold.nThreshold; jsonItem["silence_duration"] = it.paramMute.nLen; jsonItem["silence_sensitivity"] = it.paramMute.nSensitivity; /* 过载条件 */ jsonItem["overload_switch"] = it.paramOverload.isEnable; jsonItem["overload_threshold"] = it.paramOverload.threshold.nThreshold; jsonItem["overload_duration"] = it.paramOverload.nLen; jsonItem["overload_sensitivity"] = it.paramOverload.nSensitivity; /* 反相条件 */ jsonItem["reverse_switch"] = it.paramPhase.isEnable; jsonItem["reverse_threshold"] = it.paramPhase.threshold.dThreshold; jsonItem["reverse_duration"] = it.paramPhase.nLen; jsonItem["reverse_sensitivity"] = it.paramPhase.nSensitivity; jsonArray.push_back(jsonItem); } /* 发送到MQTT */ if(m_pFromMQTT->connectState() == QMQTT::ConnectionState::STATE_CONNECTED) { QByteArray jsonData = QByteArray::fromStdString(jsonArray.dump()); if(!m_pFromMQTT->sendMessage(m_pubTopic, jsonData, 0, true)) { SPDLOG_LOGGER_WARN(m_logger, "ThreadCompareItemManager: 发送对比项信息到MQTT失败"); }else { SPDLOG_LOGGER_TRACE(m_logger, "ThreadCompareItemManager: 发送对比项信息到MQTT成功"); } } else { SPDLOG_LOGGER_WARN(m_logger, "m_pFromMQTT 未连接到服务器,无法发送"); } } /* 初始化MQTT */ void ThreadCompareItemManager::initMQTT() { if(m_pFromMQTT == nullptr) { m_pFromMQTT = new FromMQTT(); if(m_pFromMQTT == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "创建MQTT对象失败"); return; } } /* 登陆MQTT */ m_pFromMQTT->setIPAndPort(GInfo.mqttIP(), GInfo.mqttPort()); // m_pFromMQTT->addSubcribe("LH_WEBINFO"); m_pFromMQTT->setAutoReconnect(true); m_pFromMQTT->connectToServer(); // connect(m_pFromMQTT, &FromMQTT::signal_recvMessage, [this](const QMQTT::Message& message) { // SPDLOG_LOGGER_WARN(m_logger, "--------------------- 接收到MQTT消息: {}", message.topic().toStdString()); // SPDLOG_LOGGER_WARN(m_logger, "消息内容: {}", message.payload().toStdString()); // }); SPDLOG_LOGGER_INFO(m_logger, "☆ 连接MQTT服务器: {}:{}, 对比项信息订阅主题: {}", GInfo.mqttIP().toStdString(), GInfo.mqttPort(), m_pubTopic.toStdString()); }