#include "CompareItemThread.h" #include "FromMQTT.h" #include "GlobalInfo.h" #include "ThreadManager.h" #include "CalculateDBThread.h" #include "NoiseDetectThread.h" #include "CompareDoubleThread.h" #include "ThreadPool.h" #include "commonDefine.h" #include "spdlog.h" #include #include CompareItemThread::CompareItemThread(CalculateThreadInfo_t& threadInfo) : BaseCalculateThread(threadInfo) { } CompareItemThread::~CompareItemThread() { } /* 线程功能函数 */ void CompareItemThread::task() { m_logBase = fmt::format("对比项: {}", m_threadInfo.compareItemInfo.strName.toStdString()); SPDLOG_LOGGER_INFO(m_logger, "----------------------------------------------------------------"); SPDLOG_LOGGER_INFO(m_logger, "{} 线程开始运行, 对比通道: ", m_logBase); for(const auto& road : m_threadInfo.compareItemInfo.mapRoad) { SPDLOG_LOGGER_INFO(m_logger, " 通道名称: {}, 通道编号: {}, 声卡通道: {}:{}", road.strCompareRoadName.toStdString(), road.nCompareRoadNum, road.scRoadInfo.strSoundCardName.toStdString(), road.scRoadInfo.roadInfo.nRoadNum); } SPDLOG_LOGGER_INFO(m_logger, "----------------------------------------------------------------"); /* 测试录音通道用 */ // for(const auto& road : m_threadInfo.compareItemInfo.mapRoad) // { // ThreadMan.createRecordThread(road.scRoadInfo, m_threadInfo.compareItemInfo.nID); // } // while(true) // { // std::this_thread::sleep_for(std::chrono::milliseconds(100)); // } /* 初始化数据 */ if(!initData()) { SPDLOG_LOGGER_ERROR(m_logger, "{} 初始化数据失败", m_logBase); return; } while (m_isRunning) { /* 睡眠100ms */ std::this_thread::sleep_for(std::chrono::milliseconds(50)); /* ------------------------------------------------------------------------------------- * 更新对比项信息 * ------------------------------------------------------------------------------------- */ if(updateThreadInfoInternal()) { SPDLOG_LOGGER_INFO(m_logger, "{} 暂停对比检测,更新对比项信息"); m_threadInfo.compareItemInfo = m_threadInfoNew.compareItemInfo; initData(); SPDLOG_LOGGER_INFO(m_logger, "{} 更新对比项信息完成,继续检测对比"); } /* ------------------------------------------------------------------------------------- * 更新数据 * ------------------------------------------------------------------------------------- */ if(!updateResultData()) { continue; } /* ------------------------------------------------------------------------------------- * 处理数据,将报警信息给写报警数据的线程 * ------------------------------------------------------------------------------------- */ processAlarmData(); /* ------------------------------------------------------------------------------------- * 将音量包数据发送到MQTT中 * ------------------------------------------------------------------------------------- */ sendResultData(); /* 清除标志位 */ clearUpdateFlags(); // SPDLOG_LOGGER_WARN(m_logger, "{} 发送对比项数据到MQTT中", m_logBase); } /* 清理数据 */ clearData(); SPDLOG_LOGGER_INFO(m_logger, "{} 线程结束运行", m_logBase); } /* 初始化数据 */ bool CompareItemThread::initData() { /* 创建录音通道线程 */ for(const auto& road : m_threadInfo.compareItemInfo.mapRoad) { ThreadMan.createRecordThread(road.scRoadInfo, m_threadInfo.compareItemInfo.nID); } /* 创建计算音量报警信息的线程指针 */ destroyCalculateDBThreads(); // 清理之前的线程 createCalculateDBThreads(); /* 创建两个对比线程,主通道是第一个通道,其他都需要和主通道进行对比 */ destroyCompareThreads(); createCompareThreads(); /* 获取计算噪音的线程 */ removeNoiseDetectThreads(); // 清理之前的噪音检测线程 getNoiseDetectThreads(); /* 初始化存储结果的数据结构 */ m_compareResult = CompareResult_t(); m_compareResult.compareItemID = m_threadInfo.compareItemInfo.nID; m_compareResult.compareItemName = m_threadInfo.compareItemInfo.strName.toStdString(); m_compareResult.dateTime = QDateTime::currentDateTime(); m_compareResult.isClientAlarm = false; // 默认不报警 m_compareResult.mapRoadVolumes.clear(); // 清空之前的数据 for(const auto& road : m_threadInfo.compareItemInfo.mapRoad) { OneRoadVolume_t oneRoadVolume; oneRoadVolume.roadInfo = road; // 设置通道信息 oneRoadVolume.dateTime = QDateTime::currentDateTime(); // 初始化时间 m_compareResult.mapRoadVolumes.insert({road.nCompareRoadNum, oneRoadVolume}); } m_mapCDBUpdated.clear(); for(auto it : m_threadInfo.compareItemInfo.mapRoad) { m_mapCDBUpdated.insert({it.nCompareRoadNum, false}); // 初始化更新标志位为false } /* 初始化报警信息 */ m_mapAlarmSilence.clear(); m_mapAlarmOverload.clear(); m_mapAlarmPhase.clear(); for(const auto& road : m_threadInfo.compareItemInfo.mapRoad) { m_mapAlarmSilence.insert({road.nCompareRoadNum, AlarmInfo_t()}); m_mapAlarmOverload.insert({road.nCompareRoadNum, AlarmInfo_t()}); m_mapAlarmPhase.insert({road.nCompareRoadNum, AlarmInfo_t()}); } m_mapAlarmSilenceLast.clear(); m_mapAlarmOverloadLast.clear(); m_mapAlarmPhaseLast.clear(); for(const auto& road : m_threadInfo.compareItemInfo.mapRoad) { m_mapAlarmSilenceLast.insert({road.nCompareRoadNum, AlarmInfo_t()}); m_mapAlarmOverloadLast.insert({road.nCompareRoadNum, AlarmInfo_t()}); m_mapAlarmPhaseLast.insert({road.nCompareRoadNum, AlarmInfo_t()}); } /* 登陆MQTT */ m_pubTopic = QString("%1/%2").arg(GInfo.mqttPubTopicDB()).arg(QString::number(m_threadInfo.compareItemInfo.nID)); m_pFromMQTT = new FromMQTT; m_pFromMQTT->setIPAndPort(GInfo.mqttIP(), GInfo.mqttPort()); m_pFromMQTT->connectToServer(); /* 等待连接成功 */ auto startTime = std::chrono::steady_clock::now(); // 记录开始时间 while(true) { if(m_pFromMQTT->connectState() == QMQTT::ConnectionState::STATE_CONNECTED) { break; // 连接成功 } /* 超过10秒还没有连接成功,返回失败 */ if(std::chrono::steady_clock::now() - startTime > std::chrono::seconds(10)) { SPDLOG_LOGGER_ERROR(m_logger, "{} 连接MQTT服务器超时", m_logBase); if(m_pFromMQTT != nullptr) { delete m_pFromMQTT; // 删除MQTT对象 m_pFromMQTT = nullptr; // 设置为nullptr } return false; } std::this_thread::sleep_for(std::chrono::milliseconds(100)); } SPDLOG_LOGGER_INFO(m_logger, "☆ {} 音量包订阅主题: {}", m_logBase, m_pubTopic.toStdString()); return true; } /* 清理数据 */ void CompareItemThread::clearData() { /* 停止所有的比对线程 */ for(auto& pair : m_mapCompareDoubleThreads) { if(pair.second != nullptr) { pair.second->stopThreadBlock(); delete pair.second; pair.second = nullptr; } } m_mapCompareDoubleThreads.clear(); /* 移除使用到的录音通道 */ for(auto& it : m_threadInfo.compareItemInfo.mapRoad) { SoundCardRoadInfo_t roadInfo = it.scRoadInfo; if(!ThreadMan.removeRecordThread(roadInfo, m_threadInfo.compareItemInfo.nID)) { SPDLOG_LOGGER_ERROR(m_logger, "{} 移除录音通道 {}:{} 失败", m_logBase, roadInfo.strSoundCardName.toStdString(), roadInfo.roadInfo.nRoadNum); } } if(m_pFromMQTT != nullptr) { delete m_pFromMQTT; // 删除MQTT对象 m_pFromMQTT = nullptr; // 设置为nullptr } } /* 创建两个对比线程,主通道是第一个通道,其他都需要和主通道进行对比 */ bool CompareItemThread::createCompareThreads() { auto it = m_threadInfo.compareItemInfo.mapRoad.begin(); auto mainRoad = it.value(); // 主通道信息 it++; // 移动到下一个通道 // uint64_t size = m_threadInfo.compareItemInfo.mapRoad.size(); for(; it != m_threadInfo.compareItemInfo.mapRoad.end(); it++) { CalculateThreadInfo_t compareThreadInfo; compareThreadInfo.compareItemInfo = m_threadInfo.compareItemInfo; compareThreadInfo.compareItemInfo.mapRoad.clear(); // 清空通道信息 compareThreadInfo.compareItemInfo.mapRoad.insert(mainRoad.nCompareRoadNum, mainRoad); // 添加主通道 compareThreadInfo.compareItemInfo.mapRoad.insert(it.key(), it.value()); // 添加当前通道 compareThreadInfo.threadState = EThreadState::State_Inited; CompareDoubleThread* pThread = new CompareDoubleThread(compareThreadInfo); if(pThread == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "{} 创建对比线程 {} - {} 失败", m_logBase, mainRoad.strCompareRoadName.toStdString(), it.value().strCompareRoadName.toStdString()); return false; } m_mapCompareDoubleThreads.insert({it.key(), pThread}); // 保存线程指针 /* 开始运行 */ CPPTP.add_task(&CompareDoubleThread::threadTask, pThread); } return true; } /* 销毁两两对比线程 */ void CompareItemThread::destroyCompareThreads() { if(m_mapCompareDoubleThreads.size() == 0) { return; // 没有对比线程 } SPDLOG_LOGGER_INFO(m_logger, "{} 销毁对比线程", m_logBase); for(auto& pair : m_mapCompareDoubleThreads) { if(pair.second != nullptr) { pair.second->stopThreadBlock(); // 停止线程 delete pair.second; // 删除线程 pair.second = nullptr; // 设置为nullptr } } m_mapCompareDoubleThreads.clear(); m_mapCDBUpdated.clear(); // 清空更新标志位 SPDLOG_LOGGER_INFO(m_logger, "{} 对比线程销毁完成", m_logBase); } /* 创建计算音量报警的线程 */ bool CompareItemThread::createCalculateDBThreads() { for(const auto& road : m_threadInfo.compareItemInfo.mapRoad) { CalculateThreadInfo_t threadInfo; threadInfo.compareItemInfo.mapRoad.clear(); // 清空通道信息 threadInfo.compareItemInfo.mapRoad.insert(road.nCompareRoadNum, road); // 添加当前通道 threadInfo.threadState = EThreadState::State_Inited; // 初始化线程状态 threadInfo.threadType = EThreadType::Type_CalculateDB; CalculateDBThread* pThread = new CalculateDBThread(threadInfo); if(pThread == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "{} 创建音量计算线程失败", m_logBase); // return false; // 获取线程失败 } CPPTP.add_task(&CalculateDBThread::threadTask, pThread); m_mapCalculateDBThreads.insert({road.nCompareRoadNum, pThread}); // 保存线程指针 } return true; } /* 销毁音量计算的线程 */ void CompareItemThread::destroyCalculateDBThreads() { if(m_mapCalculateDBThreads.size() == 0) { return; // 没有音量计算线程 } SPDLOG_LOGGER_INFO(m_logger, "{} 销毁音量计算线程", m_logBase); for(auto& pair : m_mapCalculateDBThreads) { if(pair.second != nullptr) { pair.second->stopThreadBlock(); // 停止线程 delete pair.second; // 删除线程 pair.second = nullptr; // 设置为nullptr } } m_mapCalculateDBThreads.clear(); m_mapCDBUpdated.clear(); // 清空更新标志位 SPDLOG_LOGGER_INFO(m_logger, "{} 音量计算线程销毁完成", m_logBase); } /* 获取噪音检测的线程 */ bool CompareItemThread::getNoiseDetectThreads() { for(const auto& road : m_threadInfo.compareItemInfo.mapRoad) { NoiseDetectThread* pThread = ThreadMan.getNoiseDetectThread(road.scRoadInfo, m_threadInfo.compareItemInfo.nID); if(pThread == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "{} 获取噪音检测线程失败", m_logBase); return false; // 获取线程失败 } m_mapNoiseDetectThreads.insert({road.nCompareRoadNum, pThread}); } return true; } /* 移除噪音检测的线程 */ void CompareItemThread::removeNoiseDetectThreads() { if(m_mapNoiseDetectThreads.size() == 0) { return; // 没有噪音检测线程 } SPDLOG_LOGGER_INFO(m_logger, "{} 移除噪音检测线程", m_logBase); for(auto& pair : m_mapNoiseDetectThreads) { if(pair.second != nullptr) { ThreadMan.removeNoiseDetectThread(pair.second->getRoadInfo(), m_threadInfo.compareItemInfo.nID); } } m_mapNoiseDetectThreads.clear(); SPDLOG_LOGGER_INFO(m_logger, "{} 噪音检测线程移除完成", m_logBase); } /** * @brief 更新数据 * 更新数据逻辑: * 1、先从音量计算线程获取最新的音量包信息,如果没有全部更新,则等待下次获取,不进行后面的操作 * 2、获取噪音检测线程的噪音信息和获取一致性信息的线程的结果无需关系是否是最新的 * */ bool CompareItemThread::updateResultData() { /* ------------------------------------------------------------------------------------- * 先从音量计算数据中获取音量包信息和报警信息(静音、过载、反相) * ------------------------------------------------------------------------------------- */ for(auto& pair : m_mapCDBUpdated) { if(pair.second == true) { /* 已经更新过了 */ continue; } CalculateDBThread* pThread = m_mapCalculateDBThreads[pair.first]; if(pThread == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "{} 音量计算线程失效", m_logBase); continue; } /* 获取最新的音量数据 */ if(!pThread->getlastVolumeInfo(m_compareResult.mapRoadVolumes[pair.first])) { continue; // 没有获取到最新数据,继续等待 } /* 更新报警信息 */ m_mapAlarmSilence[pair.first] = pThread->getAlarm(EAlarmType::EAT_Silent); m_mapAlarmOverload[pair.first] = pThread->getAlarm(EAlarmType::EAT_Overload); m_mapAlarmPhase[pair.first] = pThread->getAlarm(EAlarmType::EAT_Reversed); pair.second = true; // 设置更新标志位为true } /* 判断是否全部更新,如果没有则返回,等待下次再次获取 */ for(auto& pair : m_mapCDBUpdated) { if(false == pair.second) { // SPDLOG_LOGGER_DEBUG(m_logger, "{} 音量计算线程数据未全部更新,等待下次获取", m_logBase); return false; } } /* ------------------------------------------------------------------------------------- * 获取噪音计算的结果 * ------------------------------------------------------------------------------------- */ for(auto& pair : m_mapNoiseDetectThreads) { NoiseDetectThread* pThread = pair.second; if(pThread == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "{} 噪音检测线程失效", m_logBase); continue; // 跳过这个线程 } /* 获取最新的噪音数据,噪音报警那个标志位貌似没用到 */ m_compareResult.mapRoadVolumes[pair.first].isNoise = pThread->isNoise(); } /* ------------------------------------------------------------------------------------- * 从对比项中获取核对过后的一致性结果 * ------------------------------------------------------------------------------------- */ for(auto& pair : m_mapCompareDoubleThreads) { CompareDoubleThread* pThread = pair.second; if(pThread == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "{} 一致性对比线程失效", m_logBase); continue; } /* 获取最新的一致性结果 */ OneRoadVolume_t roadVolume; if(pThread->getlastVolumeInfo(roadVolume)) { m_compareResult.mapRoadVolumes[pair.first].isConsistency = roadVolume.isConsistency; m_compareResult.mapRoadVolumes[pair.first].isNotConsistencyWarning = roadVolume.isNotConsistencyWarning; } } return true; } /* 处理报警数据,主要是和之前的数据尽情对比,是否是一样的 */ void CompareItemThread::processAlarmData() { m_listAlarm.clear(); /* 处理静音报警数据 */ for(auto& pair : m_mapAlarmSilence) { auto& nowAlarm = pair.second; auto& lastAlarm = m_mapAlarmSilenceLast[pair.first]; if(nowAlarm.isAlarm) { if(lastAlarm == nowAlarm) { continue; // 和上次的报警信息一样,不需要处理 }else { nowAlarm.CompareItemID = m_threadInfo.compareItemInfo.nID; nowAlarm.strCompareItemName = m_threadInfo.compareItemInfo.strName.toStdString(); m_listAlarm.push_back(nowAlarm); m_mapAlarmSilenceLast[pair.first] = nowAlarm; } } } /* 处理过载报警数据 */ for(auto& pair : m_mapAlarmOverload) { auto& nowAlarm = pair.second; auto& lastAlarm = m_mapAlarmOverloadLast[pair.first]; if(nowAlarm.isAlarm) { if(lastAlarm == nowAlarm) { continue; // 和上次的报警信息一样,不需要处理 }else { nowAlarm.CompareItemID = m_threadInfo.compareItemInfo.nID; nowAlarm.strCompareItemName = m_threadInfo.compareItemInfo.strName.toStdString(); m_listAlarm.push_back(nowAlarm); m_mapAlarmOverloadLast[pair.first] = nowAlarm; } } } /* 处理反相报警数据 */ for(auto& pair : m_mapAlarmPhase) { auto& nowAlarm = pair.second; auto& lastAlarm = m_mapAlarmPhaseLast[pair.first]; if(nowAlarm.isAlarm) { if(lastAlarm == nowAlarm) { continue; // 和上次的报警信息一样,不需要处理 }else { nowAlarm.CompareItemID = m_threadInfo.compareItemInfo.nID; nowAlarm.strCompareItemName = m_threadInfo.compareItemInfo.strName.toStdString(); m_listAlarm.push_back(nowAlarm); m_mapAlarmPhaseLast[pair.first] = nowAlarm; } } } /* 将报警列表写入到处理报警数据的线程中 */ } /* 发送数据 */ void CompareItemThread::sendResultData() { /* 生成json数据 */ QByteArray jsonData; if(!generateMQTTJsonData(m_compareResult, jsonData)) { SPDLOG_LOGGER_WARN(m_logger, "{} 生成音量包 JSON数据失败", m_logBase); return; } /* 发送到mqtt中 */ int errorCode = 0; if(!m_pFromMQTT->sendMessage(m_pubTopic, jsonData, 0, errorCode)) { SPDLOG_LOGGER_ERROR(m_logger, "{} 发送音量包数据到 {} 失败,错误代码: {}", m_logBase, m_pubTopic.toStdString(), errorCode); }else { SPDLOG_LOGGER_DEBUG(m_logger, "{} 发送音量包数据到 {} 成功", m_logBase, m_pubTopic.toStdString()); } } /* 清除标志更新位 */ void CompareItemThread::clearUpdateFlags() { for(auto& pair : m_mapCDBUpdated) { pair.second = false; // 清除更新标志位 } } /* 生成发送至MQTT的JSON数据 */ bool CompareItemThread::generateMQTTJsonData(const CompareResult_t& compareResult, QByteArray& jsonData) { try { /* 生成基础信息 */ nJson json0; json0["compareItem_id"] = compareResult.compareItemID; json0["compareItem_name"] = compareResult.compareItemName.c_str(); json0["date_time"] = compareResult.dateTime.toString("yyyy-MM-dd hh:mm:ss").toStdString(); json0["is_client_alarm"] = compareResult.isClientAlarm; for(const auto& roadVolume : compareResult.mapRoadVolumes) { nJson json1; json1["soundCard_id"] = roadVolume.second.roadInfo.scRoadInfo.strSoundCardID.toStdString(); /* 声卡id和声卡通道id */ json1["soundCard_road_id"] = roadVolume.second.roadInfo.scRoadInfo.roadInfo.nRoadNum; /* 对比项通道编号和名称 */ json1["item_road_num"] = roadVolume.second.roadInfo.nCompareRoadNum; json1["item_road_name"] = roadVolume.second.roadInfo.strCompareRoadName.toStdString(); json1["similarity"] = roadVolume.second.similarity; json1["is_silence"] = roadVolume.second.isSilence; json1["is_overload"] = roadVolume.second.isOverload; json1["is_reversed"] = roadVolume.second.isReversed; json1["is_noise"] = roadVolume.second.isNoise; json1["is_noise_warning"] = roadVolume.second.isNoiseWarning; json1["is_consistency"] = roadVolume.second.isConsistency; json1["is_not_consistency_warning"] = roadVolume.second.isNotConsistencyWarning; json1["left_real_time_db"] = roadVolume.second.leftRealTimeDB; json1["right_real_time_db"] = roadVolume.second.rightRealTimeDB; /* 添加音量包信息 */ for(const auto& db : roadVolume.second.vecleftDB) { json1["left_db_array"].push_back(db); } for(const auto& db : roadVolume.second.vecrightDB) { json1["right_db_array"].push_back(db); } /* 添加到基础信息中 */ json0["road_volumes"].push_back(json1); } /* 转换为字符串 */ jsonData.clear(); jsonData = QByteArray::fromStdString(json0.dump()); }nJsonCatch return true; }