#include "CompareItemThread.h" #include "FromMQTT.h" #include "GlobalInfo.h" #include "ThreadManager.h" #include "CalculateDBThread.h" #include "NoiseDetectThread.h" #include "CompareDoubleThread.h" #include "ThreadPool.h" #include "commonDefine.h" #include "ThreadAlarmManager.h" #include "spdlog.h" #include CompareItemThread::CompareItemThread(CalculateThreadInfo_t& threadInfo) : QObject(nullptr), BaseCalculateThread(threadInfo) { } CompareItemThread::~CompareItemThread() { } /* 重写父类的线程函数,这里重新实现 */ void CompareItemThread::threadTask() { m_logBase = fmt::format("对比项: {}", m_threadInfo.compareItemInfo.strName.toStdString()); SPDLOG_LOGGER_INFO(m_logger, "----------------------------------------------------------------"); SPDLOG_LOGGER_INFO(m_logger, "{} 线程开始运行, 对比通道: ", m_logBase); for(const auto& road : m_threadInfo.compareItemInfo.mapRoad) { SPDLOG_LOGGER_INFO(m_logger, " 通道名称: {}, 通道编号: {}, 声卡通道: {}:{}", road.strCompareRoadName.toStdString(), road.nCompareRoadNum, road.scRoadInfo.strSoundCardName.toStdString(), road.scRoadInfo.roadInfo.nRoadNum); } SPDLOG_LOGGER_INFO(m_logger, "----------------------------------------------------------------"); /* 初始化数据 */ if(!initData()) { SPDLOG_LOGGER_ERROR(m_logger, "{} 初始化数据失败", m_logBase); return; } /* 更新线程状态标志 */ m_threadInfo.threadState = EThreadState::State_Running; m_isStop = false; m_pTimer = new QTimer(); if(m_pTimer == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "{} 创建定时器失败", m_logBase); } // SPDLOG_LOGGER_WARN(m_logger, "{} 创建定时器成功,开启定时器", m_logBase); m_pTimer->setTimerType(Qt::PreciseTimer); m_pTimer->setSingleShot(false); // 设置为非单次定时器 m_pTimer->setInterval(30); m_eventLoop.connect(m_pTimer, &QTimer::timeout, this, &CompareItemThread::do_timeout, Qt::DirectConnection); m_pTimer->start(); /* 开启事件循环 */ m_eventLoop.exec(); /* 等待 */ // std::this_thread::sleep_for(std::chrono::seconds(20)); /* 线程结束,清理数据 */ clearData(); m_threadInfo.threadState = EThreadState::State_Stopped; m_isStop = true; SPDLOG_LOGGER_WARN(m_logger, "★ {} 线程结束运行", m_logBase); } /* 停止线程函数 */ void CompareItemThread::stopThread() { if(m_pTimer != nullptr) { if(m_pTimer->isActive()) { m_pTimer->stop(); } disconnect(m_pTimer, &QTimer::timeout, this, &CompareItemThread::do_timeout); delete m_pTimer; m_pTimer = nullptr; } /* 停止事件循环 */ m_eventLoop.quit(); } void CompareItemThread::stopThreadBlock() { stopThread(); while(!m_isStop) // 等待线程停止 { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } /* 设置检测时段 */ void CompareItemThread::setDetectPeriod(const DetectPeriodConfig_t& detectPeriod) { std::lock_guard lock(m_mutexDetectPeriod); if(detectPeriod.nID != m_threadInfo.compareItemInfo.nID) { SPDLOG_LOGGER_ERROR(m_logger, "{} 设置检测时段失败,ID不匹配: {}, 当前ID: {}", m_logBase, detectPeriod.nID, m_detectPeriod.nID); return; } m_detectPeriod = detectPeriod; // m_isDetectPeriodUpdated = true; // 标记检测时段已更新 } /* 线程功能函数 */ void CompareItemThread::task() { m_logBase = fmt::format("对比项: {}", m_threadInfo.compareItemInfo.strName.toStdString()); SPDLOG_LOGGER_INFO(m_logger, "----------------------------------------------------------------"); SPDLOG_LOGGER_INFO(m_logger, "{} 线程开始运行, 对比通道: ", m_logBase); for(const auto& road : m_threadInfo.compareItemInfo.mapRoad) { SPDLOG_LOGGER_INFO(m_logger, " 通道名称: {}, 通道编号: {}, 声卡通道: {}:{}", road.strCompareRoadName.toStdString(), road.nCompareRoadNum, road.scRoadInfo.strSoundCardName.toStdString(), road.scRoadInfo.roadInfo.nRoadNum); } SPDLOG_LOGGER_INFO(m_logger, "----------------------------------------------------------------"); /* 初始化数据 */ if(!initData()) { SPDLOG_LOGGER_ERROR(m_logger, "{} 初始化数据失败", m_logBase); return; } // SPDLOG_LOGGER_WARN(m_logger, "{} 初始化数据完成,开始对比检测, isRunning: {}", m_logBase, m_isRunning.load()); while (m_isRunning) { /* 睡眠100ms */ std::this_thread::sleep_for(std::chrono::milliseconds(50)); timerTask(); } /* 清理数据 */ clearData(); SPDLOG_LOGGER_WARN(m_logger, "★ {} 线程结束运行", m_logBase); } /* 定时器任务 */ void CompareItemThread::timerTask() { /* ------------------------------------------------------------------------------------- * 更新对比项信息 * ------------------------------------------------------------------------------------- */ if(updateThreadInfoInternal()) { SPDLOG_LOGGER_INFO(m_logger, "{} 暂停对比检测,更新对比项信息"); m_threadInfo.compareItemInfo = m_threadInfoNew.compareItemInfo; initData(); SPDLOG_LOGGER_INFO(m_logger, "{} 更新对比项信息完成,继续检测对比"); } /* 更新检测时间段 */ checkDetectPeriod(); /* ------------------------------------------------------------------------------------- * 更新数据 * ------------------------------------------------------------------------------------- */ if(!updateResultData()) { return; } /* ------------------------------------------------------------------------------------- * 处理数据,将报警信息给写报警数据的线程,现在直接由各个计算线程直接写入 * ------------------------------------------------------------------------------------- */ // processAlarmData(); /* ------------------------------------------------------------------------------------- * 将音量包数据发送到MQTT中 * ------------------------------------------------------------------------------------- */ sendResultData(); /* 清除标志位 */ clearUpdateFlags(); } /* 初始化数据 */ bool CompareItemThread::initData() { /* 创建录音通道线程 */ for(const auto& road : m_threadInfo.compareItemInfo.mapRoad) { ThreadMan.createRecordThread(road.scRoadInfo, m_threadInfo.compareItemInfo.nID); } /* 创建计算音量报警信息的线程指针 */ destroyCalculateDBThreads(); // 清理之前的线程 createCalculateDBThreads(); /* 创建两个对比线程,主通道是第一个通道,其他都需要和主通道进行对比 */ destroyCompareThreads(); createCompareThreads(); /* 创建计算噪音的线程 */ destroyNoiseDetectThreads(); createNoiseDetectThreads(); /* 获取创建实时音量包的线程 */ if(!getCreateDBThread()) { // SPDLOG_LOGGER_ERROR(m_logger, "{} 获取创建实时音量线程失败", m_logBase); return false; } // for(const auto& pThread : m_mapCreateDBThreads) // { // if(pThread.second == nullptr) // { // SPDLOG_LOGGER_ERROR(m_logger, "{} 获取创建实时音量线程失败,通道: {}", m_logBase, pThread.first); // }else { // SPDLOG_LOGGER_INFO(m_logger, "{} 获取创建实时音量线程成功,通道: {}", m_logBase, pThread.first); // } // } /* 初始化存储结果的数据结构 */ m_compareResult = CompareResult_t(); m_compareResult.compareItemID = m_threadInfo.compareItemInfo.nID; m_compareResult.compareItemName = m_threadInfo.compareItemInfo.strName.toStdString(); m_compareResult.dateTime = QDateTime::currentDateTime(); m_compareResult.isClientAlarm = false; // 默认不报警 m_compareResult.mapRoadVolumes.clear(); // 清空之前的数据 for(const auto& road : m_threadInfo.compareItemInfo.mapRoad) { OneRoadVolume_t oneRoadVolume; oneRoadVolume.roadInfo = road; // 设置通道信息 oneRoadVolume.dateTime = QDateTime::currentDateTime(); // 初始化时间 m_compareResult.mapRoadVolumes.insert({road.nCompareRoadNum, oneRoadVolume}); m_mapRealTimeData.insert({road.nCompareRoadNum, std::list()}); // 初始化实时数据 } m_mapCDBUpdated.clear(); for(auto it : m_threadInfo.compareItemInfo.mapRoad) { m_mapCDBUpdated.insert({it.nCompareRoadNum, false}); // 初始化更新标志位为false } QDateTime currentTime = QDateTime::currentDateTime(); m_lastDetectPeriodUpdateTime = currentTime; return true; } /* 清理数据 */ void CompareItemThread::clearData() { /* 停止所有的比对线程 */ destroyCompareThreads(); /* 销毁音量报警线程 */ destroyCalculateDBThreads(); /* 销毁噪音检测线程 */ destroyNoiseDetectThreads(); /* 移除使用到的录音通道 */ for(auto& it : m_threadInfo.compareItemInfo.mapRoad) { SoundCardRoadInfo_t roadInfo = it.scRoadInfo; if(!ThreadMan.removeRecordThread(roadInfo, m_threadInfo.compareItemInfo.nID)) { SPDLOG_LOGGER_ERROR(m_logger, "{} 移除录音通道 {}:{} 失败", m_logBase, roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum); } } if(m_pFromMQTT != nullptr) { delete m_pFromMQTT; // 删除MQTT对象 m_pFromMQTT = nullptr; // 设置为nullptr } } /* 定时器槽函数 */ void CompareItemThread::do_timeout() { std::chrono::steady_clock::time_point startTime = std::chrono::steady_clock::now(); // SPDLOG_LOGGER_WARN(m_logger, "{} 定时器触发,开始执行定时任务", m_logBase); if(m_pFromMQTT == nullptr) { initMQTT(); } timerTask(); std::chrono::steady_clock::time_point endTime = std::chrono::steady_clock::now(); std::chrono::milliseconds duration = std::chrono::duration_cast(endTime - startTime); SPDLOG_LOGGER_DEBUG(m_logger, "{} 定时器任务完成,耗时: {} ms", m_logBase, duration.count()); } /* 初始化MQTT */ void CompareItemThread::initMQTT() { if(m_pFromMQTT == nullptr) { m_pFromMQTT = new FromMQTT(); if(m_pFromMQTT == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "{} 创建MQTT对象失败", m_logBase); return; // 创建MQTT对象失败 } } /* 登陆MQTT */ m_pFromMQTT->setIPAndPort(GInfo.mqttIP(), GInfo.mqttPort()); m_pFromMQTT->setAutoReconnect(); m_pFromMQTT->connectToServer(); m_pubTopic = QString("%1/%2").arg(GInfo.mqttPubTopicDB()).arg(QString::number(m_threadInfo.compareItemInfo.nID)); SPDLOG_LOGGER_INFO(m_logger, "☆ {} 连接MQTT服务器: {}:{}, 音量包订阅主题: {}", m_logBase, GInfo.mqttIP().toStdString(), GInfo.mqttPort(), m_pubTopic.toStdString()); } /* 获取创建实时音量的线程,这个线程属于录音线程,不需要在这里移除和销毁 */ bool CompareItemThread::getCreateDBThread() { m_mapCreateDBThreads.clear(); QDateTime startTime = QDateTime::currentDateTime(); /* 先设置有多少录音通道 */ for(const auto& road : m_threadInfo.compareItemInfo.mapRoad) { m_mapCreateDBThreads.insert({road.nCompareRoadNum, nullptr}); // 初始化为nullptr } while(true) { bool isAllThreadsReady = true; for(const auto& road : m_threadInfo.compareItemInfo.mapRoad) { CreateDBThread* pThread = ThreadMan.getCreateDBThread(road.scRoadInfo.nSoundCardNum, road.scRoadInfo.roadInfo.nRoadNum); if(pThread != nullptr) { m_mapCreateDBThreads[road.nCompareRoadNum] = pThread; // SPDLOG_LOGGER_INFO(m_logger, "{} 获取创建实时音量线程成功,通道: {}:{}", m_logBase, road.scRoadInfo.strSoundCardName.toStdString(), road.scRoadInfo.roadInfo.nRoadNum); } else { isAllThreadsReady = false; SPDLOG_LOGGER_WARN(m_logger, "{} 获取创建实时音量线程失败,通道: {}:{}", m_logBase, road.scRoadInfo.strSoundCardName.toStdString(), road.scRoadInfo.roadInfo.nRoadNum); } } if(isAllThreadsReady) { return true; } QDateTime currentTime = QDateTime::currentDateTime(); if(startTime.secsTo(currentTime) > 10) // 超过10秒还没有获取到线程,直接返回失败 { SPDLOG_LOGGER_ERROR(m_logger, "{} 获取创建实时音量线程超时,通道: {}", m_logBase, m_threadInfo.compareItemInfo.strName.toStdString()); return false; } } } /* 创建两个对比线程,主通道是第一个通道,其他都需要和主通道进行对比 */ bool CompareItemThread::createCompareThreads() { auto it = m_threadInfo.compareItemInfo.mapRoad.begin(); auto mainRoad = it.value(); // 主通道信息 it++; // 移动到下一个通道 // uint64_t size = m_threadInfo.compareItemInfo.mapRoad.size(); for(; it != m_threadInfo.compareItemInfo.mapRoad.end(); it++) { CalculateThreadInfo_t compareThreadInfo; compareThreadInfo.compareItemInfo = m_threadInfo.compareItemInfo; compareThreadInfo.compareItemInfo.mapRoad.clear(); // 清空通道信息 compareThreadInfo.compareItemInfo.mapRoad.insert(mainRoad.nCompareRoadNum, mainRoad); // 添加主通道 compareThreadInfo.compareItemInfo.mapRoad.insert(it.key(), it.value()); // 添加当前通道 compareThreadInfo.threadState = EThreadState::State_Inited; CompareDoubleThread* pThread = new CompareDoubleThread(compareThreadInfo); if(pThread == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "{} 创建对比线程 {} - {} 失败", m_logBase, mainRoad.strCompareRoadName.toStdString(), it.value().strCompareRoadName.toStdString()); return false; } m_mapCompareDoubleThreads.insert({it.key(), pThread}); // 保存线程指针 /* 开始运行 */ CPPTP.add_task(&CompareDoubleThread::threadTask, pThread); } return true; } /* 销毁两两对比线程 */ void CompareItemThread::destroyCompareThreads() { if(m_mapCompareDoubleThreads.size() == 0) { return; // 没有对比线程 } SPDLOG_LOGGER_INFO(m_logger, "{} 销毁对比线程", m_logBase); for(auto& pair : m_mapCompareDoubleThreads) { if(pair.second != nullptr) { pair.second->stopThreadBlock(); // 停止线程 delete pair.second; // 删除线程 pair.second = nullptr; // 设置为nullptr } } m_mapCompareDoubleThreads.clear(); m_mapCDBUpdated.clear(); // 清空更新标志位 SPDLOG_LOGGER_INFO(m_logger, "{} 对比线程销毁完成", m_logBase); } /* 创建计算音量报警的线程 */ bool CompareItemThread::createCalculateDBThreads() { for(const auto& road : m_threadInfo.compareItemInfo.mapRoad) { CalculateThreadInfo_t threadInfo; threadInfo.compareItemInfo.mapRoad.clear(); // 清空通道信息 threadInfo.compareItemInfo.mapRoad.insert(road.nCompareRoadNum, road); // 添加当前通道 threadInfo.threadState = EThreadState::State_Inited; // 初始化线程状态 threadInfo.threadType = EThreadType::Type_CalculateDB; CalculateDBThread* pThread = new CalculateDBThread(threadInfo); if(pThread == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "{} 创建音量计算线程失败", m_logBase); // return false; // 获取线程失败 } CPPTP.add_task(&CalculateDBThread::threadTask, pThread); m_mapCalculateDBThreads.insert({road.nCompareRoadNum, pThread}); // 保存线程指针 } return true; } /* 销毁音量计算的线程 */ void CompareItemThread::destroyCalculateDBThreads() { if(m_mapCalculateDBThreads.size() == 0) { return; // 没有音量计算线程 } SPDLOG_LOGGER_INFO(m_logger, "{} 销毁音量计算线程", m_logBase); for(auto& pair : m_mapCalculateDBThreads) { if(pair.second != nullptr) { pair.second->stopThreadBlock(); // 停止线程 delete pair.second; // 删除线程 pair.second = nullptr; // 设置为nullptr } } m_mapCalculateDBThreads.clear(); m_mapCDBUpdated.clear(); // 清空更新标志位 SPDLOG_LOGGER_INFO(m_logger, "{} 音量计算线程销毁完成", m_logBase); } /* 创建噪音检测线程 */ void CompareItemThread::createNoiseDetectThreads() { for(const auto& road : m_threadInfo.compareItemInfo.mapRoad) { CalculateThreadInfo_t threadInfo; threadInfo.compareItemInfo.mapRoad.clear(); // 清空通道信息 threadInfo.compareItemInfo.mapRoad.insert(road.nCompareRoadNum, road); // 添加当前通道 threadInfo.threadState = EThreadState::State_Inited; // 初始化线程状态 threadInfo.threadType = EThreadType::Type_CalculateDB; NoiseDetectThread* pThread = new NoiseDetectThread(threadInfo); if(pThread == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "{} 创建噪音检测线程失败", m_logBase); return; // 获取线程失败 } m_mapNoiseDetectThreads.insert({road.nCompareRoadNum, pThread}); } } /* 销毁噪音检测线程 */ void CompareItemThread::destroyNoiseDetectThreads() { if(m_mapNoiseDetectThreads.size() == 0) { return; // 没有噪音检测线程 } SPDLOG_LOGGER_INFO(m_logger, "{} 销毁噪音检测线程", m_logBase); for(auto& pair : m_mapNoiseDetectThreads) { if(pair.second != nullptr) { pair.second->stopThreadBlock(); // 停止线程 delete pair.second; // 删除线程 pair.second = nullptr; // 设置为nullptr } } m_mapNoiseDetectThreads.clear(); SPDLOG_LOGGER_INFO(m_logger, "{} 噪音检测线程销毁完成", m_logBase); } /* 获取噪音检测的线程 */ // bool CompareItemThread::getNoiseDetectThreads() // { // for(const auto& road : m_threadInfo.compareItemInfo.mapRoad) // { // NoiseDetectThread* pThread = ThreadMan.getNoiseDetectThread(road.scRoadInfo, m_threadInfo.compareItemInfo.nID); // if(pThread == nullptr) // { // SPDLOG_LOGGER_ERROR(m_logger, "{} 获取噪音检测线程失败", m_logBase); // return false; // 获取线程失败 // } // /* 向噪音检测线程写入对比项通道信息 */ // pThread->startCompareItemNoiseAlarm(m_threadInfo.compareItemInfo.nID, m_threadInfo.compareItemInfo.strName, road); // m_mapNoiseDetectThreads.insert({road.nCompareRoadNum, pThread}); // } // return true; // } /* 移除噪音检测的线程 */ // void CompareItemThread::removeNoiseDetectThreads() // { // if(m_mapNoiseDetectThreads.size() == 0) // { // return; // 没有噪音检测线程 // } // SPDLOG_LOGGER_INFO(m_logger, "{} 移除噪音检测线程", m_logBase); // for(auto& pair : m_mapNoiseDetectThreads) // { // if(pair.second != nullptr) // { // ThreadMan.removeNoiseDetectThread(pair.second->getRoadInfo(), m_threadInfo.compareItemInfo.nID); // } // } // m_mapNoiseDetectThreads.clear(); // SPDLOG_LOGGER_INFO(m_logger, "{} 噪音检测线程移除完成", m_logBase); // } /** 音量包更新逻辑: 1、先从音量创建线程获取最新的音量包信息,固定n个音量包,音量包数量和发送速度有关系,一秒 最大有30个音量包(由宏定义VOLUME_INFO_NUM决定),如果每次发送3个音量包,则100ms发送一次 2、获取音量包直接从录音线程(CreateDBThread)获取,每次获取3个,循环等待所有通道获取完3个后 进行发送 3、客户端依赖音量包绘制波形,因此音量包尽可能地快速发送 检测结果更新逻辑 2、获取噪音检测线程的噪音信息和获取一致性信息的线程的结果无需关心是否是最新的 */ bool CompareItemThread::updateResultData() { /* ------------------------------------------------------------------------------------- * 先从音量计算数据中获取音量包信息,循环等待获取完成 * ------------------------------------------------------------------------------------- */ bool isAllUpdated = false; while(isAllUpdated == false) { std::this_thread::sleep_for(std::chrono::milliseconds(1)); for(auto& pair : m_mapCDBUpdated) { if(pair.second == true) { /* 已经更新过了 */ continue; } // CalculateDBThread* pThread = m_mapCalculateDBThreads[pair.first]; // if(pThread == nullptr) // { // SPDLOG_LOGGER_ERROR(m_logger, "{} 音量计算线程失效", m_logBase); // continue; // } // /* 获取最新的音量包数据 */ // if(!pThread->getlastVolumeInfo(m_compareResult.mapRoadVolumes[pair.first])) // { // continue; // 没有获取到最新数据,继续等待 // } CreateDBThread* pThread = m_mapCreateDBThreads[pair.first]; if(pThread == nullptr) { SPDLOG_LOGGER_WARN(m_logger, "{} 获取音量创建线程失败,通道: {}", m_logBase, pair.first); continue; } /* 获取最新的音量包数据 */ if(!pThread->getLatestRealTimeResult(m_mapRealTimeData[pair.first], 3)) { // SPDLOG_LOGGER_WARN(m_logger, "{} 获取最新音量值失败,通道: {}", m_logBase, pair.first); continue; } pair.second = true; } /* 判断是否全部更新,如果没有则返回,等待下次再次获取 */ isAllUpdated = true; for(auto& pair : m_mapCDBUpdated) { if(false == pair.second) { isAllUpdated = false; // SPDLOG_LOGGER_DEBUG(m_logger, "{} 音量计算线程数据未全部更新,等待下次获取", m_logBase); break; } } } /* ------------------------------------------------------------------------------------- * 获取静音、过载、反相的结果 * ------------------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------------------- * 获取噪音计算的结果(从噪音对比线程中获取) * ------------------------------------------------------------------------------------- */ for(auto& pair : m_mapNoiseDetectThreads) { NoiseDetectThread* pThread = pair.second; if(pThread == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "{} 噪音检测线程失效", m_logBase); continue; // 跳过这个线程 } /* 获取最新的噪音数据,噪音报警那个标志位貌似没用到 */ m_compareResult.mapRoadVolumes[pair.first].isNoise = pThread->isNoise(); } /* ------------------------------------------------------------------------------------- * 从对比项中获取核对过后的一致性结果 * ------------------------------------------------------------------------------------- */ for(auto& pair : m_mapCompareDoubleThreads) { CompareDoubleThread* pThread = pair.second; if(pThread == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "{} 一致性对比线程失效", m_logBase); continue; } /* 获取最新的一致性结果 */ OneRoadVolume_t roadVolume; if(pThread->getlastResult(roadVolume)) { m_compareResult.mapRoadVolumes[pair.first].isConsistency = roadVolume.isConsistency; m_compareResult.mapRoadVolumes[pair.first].isNotConsistencyWarning = roadVolume.isNotConsistencyWarning; } } // for(const auto& roadVolume : m_compareResult.mapRoadVolumes) // { // const OneRoadVolume_t& volume = roadVolume.second; // for(int i = 0; i < VOLUME_INFO_NUM; ++i) // { // fmt::print("LDB:{}, RDB:{}\n", volume.vecleftDB[i], volume.vecrightDB[i]); // } // } return true; } /* 发送数据 */ void CompareItemThread::sendResultData() { // for(const auto& roadVolume : m_compareResult.mapRoadVolumes) // { // const OneRoadVolume_t& volume = roadVolume.second; // for(int i = 0; i < VOLUME_INFO_NUM; ++i) // { // fmt::print("LDB:{}, RDB:{}\n", volume.vecleftDB[i], volume.vecrightDB[i]); // } // } if(m_pFromMQTT == nullptr) { SPDLOG_LOGGER_WARN(m_logger, "{} MQTT连接未初始化,无法发送数据", m_logBase); return; } if(m_pFromMQTT->connectState() != QMQTT::ConnectionState::STATE_CONNECTED) { SPDLOG_LOGGER_WARN(m_logger, "{} MQTT连接未成功,无法发送数据", m_logBase); return; } /* 生成json数据 */ QByteArray jsonData; if(!generateMQTTJsonData(m_compareResult, jsonData)) { SPDLOG_LOGGER_WARN(m_logger, "{} 生成音量包 JSON数据失败", m_logBase); return; } /* 发送到mqtt中 */ int errorCode = 0; if(!m_pFromMQTT->sendMessage(m_pubTopic, jsonData, 0, errorCode)) { SPDLOG_LOGGER_ERROR(m_logger, "{} 发送音量包数据到 {} 失败,错误代码: {}", m_logBase, m_pubTopic.toStdString(), errorCode); }else { SPDLOG_LOGGER_DEBUG(m_logger, "{} 发送音量包数据到 {} 成功", m_logBase, m_pubTopic.toStdString()); } } /* 清除标志更新位 */ void CompareItemThread::clearUpdateFlags() { for(auto& pair : m_mapCDBUpdated) { pair.second = false; // 清除更新标志位 } } /* 生成发送至MQTT的JSON数据 */ bool CompareItemThread::generateMQTTJsonData(const CompareResult_t& compareResult, QByteArray& jsonData) { try { /* 生成基础信息 */ nJson json0; json0["compareItem_id"] = compareResult.compareItemID; json0["compareItem_name"] = compareResult.compareItemName.c_str(); json0["date_time"] = compareResult.dateTime.toString("yyyy-MM-dd hh:mm:ss").toStdString(); json0["is_client_alarm"] = compareResult.isClientAlarm; for(const auto& roadVolume : compareResult.mapRoadVolumes) { const int roadNum = roadVolume.first; nJson json1; json1["soundCard_id"] = roadVolume.second.roadInfo.scRoadInfo.strSoundCardID.toStdString(); /* 声卡id和声卡通道id */ json1["soundCard_road_id"] = roadVolume.second.roadInfo.scRoadInfo.roadInfo.nRoadNum; /* 对比项通道编号和名称 */ json1["item_road_num"] = roadNum; json1["item_road_name"] = roadVolume.second.roadInfo.strCompareRoadName.toStdString(); json1["similarity"] = roadVolume.second.similarity; json1["is_silence"] = roadVolume.second.isSilence; json1["is_overload"] = roadVolume.second.isOverload; json1["is_reversed"] = roadVolume.second.isReversed; json1["is_noise"] = roadVolume.second.isNoise; json1["is_noise_warning"] = roadVolume.second.isNoiseWarning; json1["is_consistency"] = roadVolume.second.isConsistency; json1["is_not_consistency_warning"] = roadVolume.second.isNotConsistencyWarning; /* 添加音量包信息 */ auto& listRTData = m_mapRealTimeData[roadNum]; for(const auto& db : listRTData) { json1["left_db_array"].push_back(db.leftDB); json1["right_db_array"].push_back(db.rightDB); // fmt::print("left_db_array: {}\n", db); } listRTData.clear(); // json1["left_real_time_db"] = m_mapRealTimeData[roadNum].leftDB; // json1["right_real_time_db"] = m_mapRealTimeData[roadNum].rightDB; /* 添加到基础信息中 */ json0["road_volumes"].push_back(json1); } /* 转换为字符串 */ jsonData.clear(); jsonData = QByteArray::fromStdString(json0.dump()); }nJsonCatch return true; } /* 检查是否在检测时间段内,更新其他检测功能的检测时间,对比项是否启用由外部控制 */ bool CompareItemThread::checkDetectPeriod() { QDateTime currentTime = QDateTime::currentDateTime(); /* 2秒更新一次一致性检测时间 */ if(m_lastDetectPeriodUpdateTime.secsTo(currentTime) < 2) { return true; } m_lastDetectPeriodUpdateTime = currentTime; std::lock_guard lock(m_mutexDetectPeriod); int currentWeekday = currentTime.date().dayOfWeek(); // 获取当前星期几,1-7表示周日到周六 QTime allDayTime; allDayTime.setHMS(0, 0, 0); /* 判断时段 */ bool isInDetectPeriod = false; for(const auto& period : m_detectPeriod.listDetect) { /* 先判断是否有符合的周几 */ if(static_cast(period.weekType) == currentWeekday) { /* 判断一种特殊情况,开始时间和结束时间都是“00:00:00”,这个是全天都检测 */ if(period.timeStart == allDayTime && period.timeEnd == allDayTime) { isInDetectPeriod = true; break; } /* 判断当前时间是否在检测时段内 */ if(currentTime.time() >= period.timeStart && currentTime.time() <= period.timeEnd) { isInDetectPeriod = true; break; } } } /* 再判断是否在非检测时间段内 */ for(const auto& period : m_detectPeriod.listNoDetect) { /* 先判断日期 */ if(period.date == currentTime.date()) { /* 判断当前时间是否在非检测时段内 */ if(currentTime.time() >= period.timeStart && currentTime.time() <= period.timeEnd) { isInDetectPeriod = false; // 在非检测时段内,不进行检测 break; } } } /* 应用于对比项检测,对比项不设置日期不会进行检测 */ for(auto& pThread : m_mapCompareDoubleThreads) { pThread.second->setInDetectPeriod(isInDetectPeriod); } /* 判断是否应用于静音、过载、反相检测 */ for(auto& pThread : m_mapCalculateDBThreads) { if(m_detectPeriod.isApplySlient) { pThread.second->setSilenceInDetectPeriod(isInDetectPeriod); }else { /* 没有应用于静音检测,就一直检测 */ pThread.second->setSilenceInDetectPeriod(true); } if(m_detectPeriod.isApplyOverload) { pThread.second->setOverloadInDetectPeriod(isInDetectPeriod); } else { /* 没有应用于过载检测,就一直检测 */ pThread.second->setOverloadInDetectPeriod(true); } if(m_detectPeriod.isApplyPhase) { pThread.second->setPhaseInDetectPeriod(isInDetectPeriod); }else { /* 没有应用于反相检测,就一直检测 */ pThread.second->setPhaseInDetectPeriod(true); } } /* 应用于噪音检测 */ for(auto& pThread : m_mapNoiseDetectThreads) { if(m_detectPeriod.isApplyNoise) { pThread.second->setInDetectPeriod(isInDetectPeriod); }else { /* 没有应用于噪音检测,就一直检测 */ pThread.second->setInDetectPeriod(true); } } return true; }