#include "SPAServer.h" #include "spdlog/spdlog.h" #include #include "ThreadPool/ThreadPool.h" #include "GlobalInfo/GlobalVariable.h" #include "GlobalInfo/GlobalConfig.h" #include "UniversalFunc.h" #include "FuncOrdinary.h" #include "FuncIllegalInvasion.h" #include SPAServer::SPAServer() { m_logger = spdlog::get("SPAServer"); if(m_logger == nullptr) { SPDLOG_ERROR("APAServer logger is nullptr"); return; } /* 读取全局的配置文件 */ QString strConfigFile = QCoreApplication::applicationDirPath() + "/config.ini"; if(!GConfig.readConfig(strConfigFile)) { /* 读取配置文件失败,直接退出程序 */ return; } GConfig.printValue(); m_threadRunning = true; /* 初始化WebAPI */ m_toEQMDataBase.initWebApi("http://192.1.3.133:31000/v6/", "", "4c2f9fc91c22dd98331e47af2e2964f4"); /* 模拟违禁品算法ID,后续需要动态调整 */ g_actionList.ActContraband = "OD210_026_005246_001-IZRTKyEx"; } SPAServer::~SPAServer() { } /* 启动服务 */ void SPAServer::startServer() { /* 添加获取基础信息的线程 */ // CPPTP.add_task(&SPAServer::threadFromSuperBrain, this); /* 测试Redis读取并解析数据线程 */ CameraThreadInfo info; info.RedisIP = "172.16.36.80"; info.RedisPort = 32222; info.RedisPWD = "Ff1z@TOFr^iwd%Ra"; info.DeviceID = 117; info.vecAction.push_back("OD210_026_005246_001-IZRTKyEx"); g_actionList.ActContraband = "OD210_026_005246_001-IZRTKyEx"; // CPPTP.add_task(&SPAServer::threadFromRedis, this, info); // threadFromRedis(info); } /** * @brief 从基础平台获取算法信息和设备信息的线程函数 * 从基础平台获取算法信息和设备信息,然后更新到EQM数据库所对应的表中 * */ void SPAServer::threadFromSuperBrain() { SPDLOG_LOGGER_INFO(m_logger, "开启 fromSuperBrainThread 线程, 获取算法和设备信息"); /* 创建变量 */ std::vector vecAlgNewInfo; std::vector vecDevNewInfo; /* 获取一次token,后续失效了再获取 */ m_fromSuperBrain.getToken(); while (m_threadRunning) { SPDLOG_LOGGER_INFO(m_logger, "刷新算法和设备信息"); /* 先更新数据库的信息,防止从其他地方更改了数据库,这里没有刷新本地缓存 */ m_toEQMDataBase.getAlgorithmInfo(m_vecEqmAlgInfo); m_toEQMDataBase.getDeviceInfo(m_vecEqmDevInfo); m_toEQMDataBase.getDeviceAlgorithmInfo(m_vecEqmDevInfo, m_listDevIDDelete); /* 从超脑获取基础信息 */ m_fromSuperBrain.getTaskTypeList(vecAlgNewInfo); m_fromSuperBrain.getDeviceList(vecDevNewInfo); /* 处理算法信息 */ bool algIsUpdate = processAlgorithmInfo(vecAlgNewInfo); /* 处理设备信息 */ bool devIsUpdate = processDeviceInfo(vecDevNewInfo); vecAlgNewInfo.clear(); vecDevNewInfo.clear(); /* 更新算法详细信息 */ m_mutexActionInfo.lock(); m_toEQMDataBase.getActionInfo(m_listActionInfo); m_mutexActionInfo.unlock(); /* 更新通道名称和摄像机名称信息 */ std::map mapChannelName; m_toEQMDataBase.getChannelInfo(mapChannelName); GConfig.setChannelInfo(mapChannelName); std::map mapCameraName; m_toEQMDataBase.getCameraInfo(mapCameraName); GConfig.setCameraInfo(mapCameraName); /* 10秒更新一次 */ std::this_thread::sleep_for(std::chrono::seconds(10)); } SPDLOG_LOGGER_INFO(m_logger, "退出 fromSuperBrainThread 线程"); } /* 处理算法信息,返回值为true,说明有改变,需要重新读取 */ bool SPAServer::processAlgorithmInfo(std::vector vecNewAlgInfo) { std::vector vecAlgUpdate; std::vector vecAlgDelete; /* 对比数据库表格信息,这里只有插入和删除,没有更新 */ compareAlgorithmInfo(vecNewAlgInfo, vecAlgUpdate, vecAlgDelete); /* 更新数据库,先删除,再写入刷新 */ bool isUpdate = false; if(vecAlgDelete.size() > 0) { SPDLOG_LOGGER_DEBUG(m_logger, "删除算法信息"); m_toEQMDataBase.deleteAlgorithmInfo(vecAlgDelete); isUpdate = true; } if(vecAlgUpdate.size() > 0) { SPDLOG_LOGGER_DEBUG(m_logger, "写入算法信息"); m_toEQMDataBase.writeAlgorithmInfo(vecAlgUpdate); isUpdate = true; } return isUpdate; } /** * @brief 处理设备信息 * * @param vecNewDevInfo 传入新获取到的值 * @return true 需要重新读取数据库,获取新的数据 * @return false 无需读取数据库 */ bool SPAServer::processDeviceInfo(std::vector vecNewDevInfo) { std::vector vecDevInsert; std::vector vecDevUpdate; std::vector vecDevDelete; /*------------------------------------------------------------------------- ****** 这里只对比设备信息,不对比设备的算法信息,算法信息在下面单独对比 ******* *------------------------------------------------------------------------*/ /* 如果本地缓存没有数据,那么就全部插入 */ if(m_vecEqmDevInfo.size() > 0) { for(auto& DevInfo : vecNewDevInfo) { bool isExist = false; for(auto& it0 : m_vecEqmDevInfo) { if(DevInfo.DeviceID == it0.DeviceID) { isExist = true; /* 对比其他项是否相等,不相等就更新 */ if(DevInfo == it0) { continue; }else { vecDevUpdate.push_back(DevInfo); } break; } } if(!isExist) { vecDevInsert.push_back(DevInfo); } } }else { vecDevInsert = vecNewDevInfo; } /* 获取删除列表 */ if(vecNewDevInfo.size() > 0) { bool isExist = false; for(const auto& it : m_vecEqmDevInfo) { isExist = false; for(const auto& it0 : vecNewDevInfo) { if(it.DeviceID == it0.DeviceID) { isExist = true; break; } } if(!isExist) { vecDevDelete.push_back(it); } } }else { vecDevDelete = m_vecEqmDevInfo; } bool isUpdate = false; /* 先删除多余的数据 */ if(vecDevDelete.size() > 0) { SPDLOG_LOGGER_DEBUG(m_logger, "删除设备信息, 表: tActionCamer"); m_toEQMDataBase.deleteDeviceInfo(vecDevDelete); isUpdate = true; } /* 更新数据 */ if(vecDevUpdate.size() > 0) { SPDLOG_LOGGER_DEBUG(m_logger, "更新设备信息, 表: tActionCamer"); m_toEQMDataBase.updateDeviceInfo(vecDevUpdate); isUpdate = true; } /* 插入数据 */ if(vecDevInsert.size() > 0) { SPDLOG_LOGGER_DEBUG(m_logger, "插入设备信息, 表: tActionCamer"); m_toEQMDataBase.insertDeviceInfo(vecDevInsert); isUpdate = true; } /*------------------------------------------------------------------------- ************* 处理设备和算子关联的表格,单独对比设备的算法信息 ************* *------------------------------------------------------------------------*/ /* 插入新的设备信息 */ if(vecDevInsert.size() > 0) { SPDLOG_LOGGER_DEBUG(m_logger, "插入设备和算法关联表(tActionCamer)"); m_toEQMDataBase.insertDeviceAlgorithmInfo(vecDevInsert); isUpdate = true; } vecDevUpdate.clear(); /* 对比现有的设备是否需要更新算法 */ compareDeviceAlgorithmInfo(vecNewDevInfo, vecDevUpdate); if(vecDevUpdate.size() > 0) { SPDLOG_LOGGER_DEBUG(m_logger, "更新设备和算法关联表(tActionCamer), 更新设备数目:{}", vecDevUpdate.size()); m_toEQMDataBase.updateDeviceAlgorithmInfo(vecDevUpdate); } /* 删除tActionCamer表中消失的设备信息 */ if(m_listDevIDDelete.size() > 0) { SPDLOG_LOGGER_DEBUG(m_logger, "删除消失的设备关联的算法(tActionCamer)"); m_toEQMDataBase.deleteDeviceAlgorithmInfo(m_listDevIDDelete); isUpdate = true; } return isUpdate; } /* 对比现有的数据和新获取到的数据,取出要删除和添加的数据 */ void SPAServer::compareAlgorithmInfo(const std::vector& vecNewInfo, std::vector& vecAlgUpdate, std::vector& vecAlgDelete) { /* 取出要添加的,如果本地缓存是0,那么全部都要添加 */ if(m_vecEqmAlgInfo.size() > 0) { for(const auto& it : vecNewInfo) { bool isExist = false; for(const auto& it0 : m_vecEqmAlgInfo) { /* 如果存在就退出循环 */ if(it.ActionID == it0.ActionID) { isExist = true; break; } } if(!isExist) { vecAlgUpdate.push_back(it); } } }else { vecAlgUpdate = vecNewInfo; } /* 取出要删除的,如果新的数据是0,那么全部都要删除 */ if(vecNewInfo.size() > 0) { bool isExist = false; for(const auto& it : m_vecEqmAlgInfo) { isExist = false; for(const auto& it0 : vecNewInfo) { if(it.ActionID == it0.ActionID) { isExist = true; break; } } if(!isExist) { vecAlgDelete.push_back(it); } } }else { vecAlgDelete = m_vecEqmAlgInfo; } } /** * @brief 对比设备和算法关联表是否需要更新 * 对比规则: * 1、这里只对比已有的设备ID,需要删除的ID在获取到tActionCamer表是就已经取出来了 * 2、如果设备ID相等,那么进一步对比算法信息是否相等 * 3、如果设备ID相等,但是算法信息数目不相等,那么直接加入更新列表 * 4、如果设备ID相等,算法信息数目相等,进一步对比算法信息 * * @param vecNewInfo * @param vecDevUpdate */ void SPAServer::compareDeviceAlgorithmInfo(const std::vector& vecNewInfo, std::vector& vecDevUpdate) { vecDevUpdate.clear(); for(const auto& it0 : vecNewInfo) { for(const auto& it1 : m_vecEqmDevInfo) { if(it0.DeviceID == it1.DeviceID) { /* 设备的算法信息数目不相等,直接加入更新列表 */ if(it0.vecAlgorithmInfo.size() != it1.vecAlgorithmInfo.size()) { vecDevUpdate.push_back(it0); break; } /* 设备的算法信息数目相等,进一步对比算法信息 */ bool isEquality = true; for(const auto& it2 : it0.vecAlgorithmInfo) { bool isEq2 = false; for(const auto& it3 : it1.vecAlgorithmInfo) { /* 这里只对比算法ID */ if(it2.ActionID != it3.ActionID) { continue; }else { isEq2 = true; break; } } if(!isEq2) { isEquality = false; break; } } if(!isEquality) { vecDevUpdate.push_back(it0); break; } } } } } /** * @brief 从Redis获取数据线程函数,这个是摄像机线程 * 一个设备一个线程,这个线程的相关变量只在这个线程中使用 * (注意,这个函数未被使用,不从这里获取Redis数据) * @param info */ void SPAServer::threadFromRedis(const CameraThreadInfo& info) { SPDLOG_LOGGER_INFO(m_logger, "开启 fromRedisThread 线程,设备ID:{}", info.DeviceID); FromRedis fromRedis; fromRedis.setRedisIPAndPort(info.RedisIP, info.RedisPort); fromRedis.setRedisPassword(info.RedisPWD); if(fromRedis.connectRedis()) { SPDLOG_LOGGER_INFO(m_logger, "连接Redis成功"); }else { SPDLOG_LOGGER_ERROR(m_logger, "连接Redis失败"); return; } CameraThreadInfo threadInfo = info; // std::string strKey = "117:OD210_026_005246_001-IZRTKyEx"; /* 取出该设备的Key */ std::vector vecKey; for(const auto& it : info.vecAction) { std::string strKey = std::to_string(info.DeviceID) + ":" + it; vecKey.push_back(strKey); } std::string strRetValue; /* 进入线程循环 */ while (m_threadRunning) { /* 循环读取这个设备关联的算法信息 */ for(const auto& it : vecKey) { SPDLOG_LOGGER_INFO(m_logger, "读取Redis信息, Key: {}", it); if( !fromRedis.getRedisString(it, strRetValue) ) { continue; } SPDLOG_LOGGER_TRACE(m_logger, "Redis Value:\n{}", strRetValue); /* 解析数据 */ AlarmInfo alarmInfo; alarmInfo.ActionID = it.substr(it.find(":") + 1); /* 解析数据 */ parseRedisBaseData(strRetValue, alarmInfo); parseRedisBBoxesData(strRetValue, alarmInfo); /* 信息时间有的效性判断,主要是记录Redis数据的有效性,是否长时间没有变化,排查错误时用的 * 如果数据长时间数据不变,那么超脑那里就挂了,写入日志 */ isEventTimeVaild(alarmInfo.EventTime); /* 是否需要写入EQM数据库判断 * 人员计数和区域人员检测需要多次判断,其他的直接写入即可 */ } std::this_thread::sleep_for(std::chrono::seconds(2)); } return; } /** * @brief 分派任务的线程 1、 线程定时刷新房间和摄像机的关联关系,以及和算法Action的关联关系 2、 将算法信息加入到不同的列表中 需要多个摄像机配合的加入到 m_runListRoomActionInfo 列表 不需要多个摄像机配合的加入到 m_runListActionInfo 列表 3、 每次刷新都会清空ActionInfo或者RoomActionInfo的摄像机列表,但是不会动其他基本信息,如果是列表中没有对应的信息, 就会创建新的ActionInfo,那么RunState的状态是INIT,需要开启新的线程 如果刷新完成后,算法对应的摄像机列表为空,那么对应的线程就会停止运行,将RunState设置为STOP,本线程的下一轮 循环就会删除这个ActionInfo */ void SPAServer::threadDistribution() { SPDLOG_LOGGER_INFO(m_logger, "开启分派任务线程"); /* 房间相机关联信息 */ std::list listRC; /* 存储获取到的应用和启用时间的信息 */ std::list listAppAndTime; /* 创建连接数据库实例 */ std::shared_ptr toEQMDataBase = std::make_shared(); while (m_threadRunning) { GThreadInfo.lockRunFAI(); /* 先清理已经退出的线程所用到的Action或者RoomAction */ GThreadInfo.clearNoneFuncActionInfo(); /* 清空已经停止运行的功能类实例 */ clearNoneFuncActionInfo(); /* 创建应用信息,根据从EQM数据库读取到的配置的应用信息创建 */ toEQMDataBase->getAlarmAppInfo(listAppAndTime); for(const auto& it : listAppAndTime) { /* 创建应用信息,如果已有该应用,就更新时间 * 这里只创建应用信息块,没有对应的房间、算法信息 */ GThreadInfo.addFuncActionInfo(it); } /* 先获取EQM数据库信息,取出房间和摄像机关联信息,包括所在的频率 */ m_mutexActionInfo.lock(); toEQMDataBase->getActionInfo(m_listActionInfo); /* 将算法信息加入到不同的功能列表中,先清空功能对应的算法设备列表 */ GThreadInfo.clearActionList(); for(const auto& it : m_listActionInfo.getData()) { GThreadInfo.addActionInfo(*it); } /* 检查算法信息的状态,频率里的应用没有配置摄像机就设置为线程运行状态为停止,退出该线程 */ GThreadInfo.setNoneCameraFuncStop(); m_mutexActionInfo.unlock(); /* 开启线程 */ for(const auto& it0 : GThreadInfo.getList()) { if(it0->RunState == RunTimeState::RUN_STATE_INIT) { /* 创建实例 */ FuncBase* pFunc = createFuncInstance(*it0); if(pFunc == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "创建功能实例失败:{}", it0->strFunctionName); continue; } CPPTP.add_task(&FuncBase::thread_task, pFunc); m_listFuncBase.push_back(pFunc); } } GThreadInfo.unlockRunFAI(); /* 休眠n秒,默认应该是300秒 */ std::this_thread::sleep_for(std::chrono::seconds(GConfig.CheckSet)); } SPDLOG_LOGGER_INFO(m_logger, "分派任务线程退出"); } /** * @brief 创建任务实例的函数,根据不同的功能创建出不同的实例 * * @param info * @return FuncBase* */ FuncBase* SPAServer::createFuncInstance(FuncActionInfo& info) { if(info.appFunction == AppFunction::APP_NONE) { return nullptr; } FuncBase* pFunc = nullptr; /* 普通的功能,只用一个摄像机一个算法的功能 */ if( info.appFunction == AppFunction::APP_AllDown || info.appFunction == AppFunction::APP_NoMask || info.appFunction == AppFunction::APP_PlayPhone || info.appFunction == AppFunction::APP_Mouse || info.appFunction == AppFunction::APP_Fatigue || info.appFunction == AppFunction::APP_Contraband ) { auto tmpFunc = new FuncOrdinary(); tmpFunc->setFuncActionInfo(info); pFunc = tmpFunc; } else if(info.appFunction == AppFunction::APP_OnWork) { /* 人员在岗检测 */ } else if(info.appFunction == AppFunction::APP_Illegal) { /* 非法入侵 */ auto tmpFunc = new FuncIllegalInvasion(); tmpFunc->setFuncActionInfo(info); pFunc = tmpFunc; } else if(info.appFunction == AppFunction::APP_Regional) { /* 区域人员检测 */ } return pFunc; } /* 清理没有在运行的线程实例 */ void SPAServer::clearNoneFuncActionInfo() { for(auto it = m_listFuncBase.begin(); it != m_listFuncBase.end(); ) { if(!(*it)->getThreadRunning()) { /* 需要转换成相应的子类实例才能删除 */ if( (*it)->getApp() == AppFunction::APP_AllDown || (*it)->getApp() == AppFunction::APP_NoMask || (*it)->getApp() == AppFunction::APP_PlayPhone || (*it)->getApp() == AppFunction::APP_Mouse || (*it)->getApp() == AppFunction::APP_Fatigue || (*it)->getApp() == AppFunction::APP_Contraband ) { FuncOrdinary* p = dynamic_cast(*it); delete p; } else if((*it)->getApp() == AppFunction::APP_OnWork) { /* 人员在岗检测 */ } else if((*it)->getApp() == AppFunction::APP_Illegal) { /* 非法入侵 */ } else if((*it)->getApp() == AppFunction::APP_Regional) { /* 区域人员检测 */ } it = m_listFuncBase.erase(it); }else { ++it; } } } /** * @brief 区域人员检测,检测这个区域内的人数,不能少于多少人,不能多余多少人 * 1、报警判断条件:直播间报警:当前频率所有直播间摄像头的区域人员检测算法输出结果均大于或小于设定人 数值时,记为报警行为,直接展示报警结果 2、录播间报警:当前频率所有录播间摄像头的区域人员检测算法输出结果均大于或小于设定人数值时,记为报 警行为,直接展示报警结果 3、直播间+录播间报警:当前频率直播间人数+录播间人数大于或小于设定人数值时,记为报警行为,直接展示 报警结果 * * @param RAInfo 传入的房间ID和算法ID */ void SPAServer::threadActRegionalPersonnelDetection(FuncActionInfo* RFAInfo) { SPDLOG_LOGGER_INFO(m_logger, "开启 {} 线程,ChannelID:{} ,", RFAInfo->strFunctionName, RFAInfo->ChannelID); /* 创建读取Redis的实例 */ std::shared_ptr fromRedis = std::make_shared(); /* 创建写入数据库实例 */ std::shared_ptr toEQMDataBase = std::make_shared(); /* 局部变量,线程功能需要的信息 */ std::shared_ptr pRFAInfo = std::make_shared(); *pRFAInfo = *RFAInfo; /* 保存每个摄像机的报警信息 */ std::shared_ptr> pListAlarmInfo = std::make_shared>(); /* 报警时间段 */ std::shared_ptr> pPersonCountRuleInfo = std::make_shared>(); /* 获取报警规则信息 */ toEQMDataBase->getPersonCountRuleInfo(*pPersonCountRuleInfo); while (m_threadRunning) { std::this_thread::sleep_for(std::chrono::milliseconds(GConfig.ThreadSleepMS)); /************ 更新线程信息 ************/ GThreadInfo.updateFuncInfo(pRFAInfo.get()); if(pRFAInfo->appFunction == AppFunction::APP_NONE) { break; } /************ 读取Redis数据 ************/ pListAlarmInfo->clear(); for(const auto& RoomInfo : pRFAInfo->listRoomCamActInfo) { for(const auto& it : RoomInfo.mapCameraAction) { std::string strKey = std::to_string(it.first) + ":" + it.second; std::string strRetValue; if(!fromRedis->getRedisString(strKey, strRetValue)) { SPDLOG_LOGGER_ERROR(m_logger, "读取Redis数据失败, Key:{}", strKey); continue; } /* 解析数据 */ AlarmInfo alarmInfo; parseRedisBaseData(strRetValue, alarmInfo); parseRedisBBoxesData(strRetValue, alarmInfo); /* 判断事件的时效性,超过多少秒不更新就可能是超脑挂了 */ if(isEventTimeVaild(alarmInfo.EventTime)) { SPDLOG_LOGGER_WARN(m_logger, "Redis Key:{} 数据长时间没有更新,EventTime:{}",strKey, alarmInfo.EventTime); continue; } pListAlarmInfo->push_back(alarmInfo); } } /************ 获取报警规则(从数据库更新?) ************/ if(!toEQMDataBase->getPersonCountRuleInfo(*pPersonCountRuleInfo)) { SPDLOG_LOGGER_ERROR(m_logger, "《人员计数》获取报警规则失败"); continue; } if(pPersonCountRuleInfo->size() == 0) { SPDLOG_LOGGER_ERROR(m_logger, "《人员计数》无报警规则"); break; } /* 获取这个频率的报警信息 */ PersonCountRuleInfo personCountRuleInfo; for(auto& it : *pPersonCountRuleInfo) { if(it.ChannelID == pRFAInfo->ChannelID) { personCountRuleInfo = it; break; } } if(personCountRuleInfo.ChannelID < 0) { SPDLOG_LOGGER_ERROR(m_logger, "《人员计数》获取报警规则失败, ChannelID:{}", pRFAInfo->ChannelID); break; } /************ 判断是否在检测时间内 ************/ QDateTime now = QDateTime::currentDateTime(); /* 判断检测时间类型 */ if(personCountRuleInfo.RuleType == 0) { /* 所有时段 */ } else if(personCountRuleInfo.RuleType == 1) { /* 按天检测 */ if(personCountRuleInfo.StartTime <= now && personCountRuleInfo.EndTime >= now) { /* 在检测时间内 */ } else { /* 不在检测时间内 */ continue; } } else if (personCountRuleInfo.RuleType == 2) { /* 按周检测 */ if(personCountRuleInfo.week != now.date().dayOfWeek()) { /* 不在检测时间内 */ continue; } /* 在检测时间内 */ if(personCountRuleInfo.StartTime.time() <= now.time() && personCountRuleInfo.EndTime.time() >= now.time()) { /* 在检测时间内 */ } else { /* 不在检测时间内 */ continue; } } /************ 挨个房间检测人数 ************/ /************ 检测频率直播间 + 导播间房间的人数 ************/ } GThreadInfo.setThreadState(pRFAInfo.get(), RunTimeState::RUN_STATE_STOP); SPDLOG_LOGGER_INFO(m_logger, "关闭 {} 线程,ChannelID:{}", pRFAInfo->strFunctionName, pRFAInfo->ChannelID); }