#include "SPAServer.h" #include "spdlog/spdlog.h" #include #include "ThreadPool/ThreadPool.h" #include "GlobalInfo/GlobalVariable.h" #include "GlobalInfo/GlobalConfig.h" #include "UniversalFunc.h" #include "FuncOrdinary.h" #include "FuncIllegalInvasion.h" #include "FuncOnAndOffJob.h" #include "FuncRegionalPerson.h" #include "GlobalFuncThread.h" #include SPAServer::SPAServer() { m_logger = spdlog::get("SPAServer"); if(m_logger == nullptr) { SPDLOG_ERROR("APAServer logger is nullptr"); return; } m_threadRunning = true; /* 初始化WebAPI */ // m_fromWebAPI.initWebApi("http://192.1.3.133:31000/v6/", "", "4c2f9fc91c22dd98331e47af2e2964f4"); /* 模拟违禁品算法ID,后续需要动态调整 */ // GVariable.ActContraband = "OD210_026_005246_001-IZRTKyEx"; } SPAServer::~SPAServer() { } /* 启动服务 */ void SPAServer::startServer() { /* 添加获取基础信息的线程 */ CPPTP.add_task(&SPAServer::threadFromSuperBrain, this); // threadFromSuperBrain(); /* 延时启动任务分配线程 */ std::this_thread::sleep_for(std::chrono::seconds(2)); /* 添加分配任务的线程 */ CPPTP.add_task(&SPAServer::threadDistribution, this); /* 测试Redis读取并解析数据线程 */ // CameraThreadInfo info; // info.RedisIP = "172.16.36.80"; // info.RedisPort = 32222; // info.RedisPWD = "Ff1z@TOFr^iwd%Ra"; // info.DeviceID = 117; // info.vecAction.push_back("OD210_026_005246_001-IZRTKyEx"); // GVariable.ActContraband = "OD210_026_005246_001-IZRTKyEx"; // CPPTP.add_task(&SPAServer::threadFromRedis, this, info); // threadFromRedis(info); } /** * @brief 从基础平台获取算法信息和设备信息的线程函数 * 从基础平台获取算法信息和设备信息,然后更新到EQM数据库所对应的表中 * */ void SPAServer::threadFromSuperBrain() { SPDLOG_LOGGER_INFO(m_logger, "开启 fromSuperBrainThread 线程, 获取算法和设备信息"); /* 创建变量 */ std::vector vecAlgNewInfo; std::vector vecDevNewInfo; /* 初始化WebAPI */ if(!m_fromWebAPIUseSB.initWebApi(GConfig.webapiUrl(), GConfig.webapiKey(), GConfig.webapiAppType())) { SPDLOG_LOGGER_ERROR(m_logger, "threadFromSuperBrain线程初始化WebAPI失败"); return; } /* 初始化SuperBrain */ m_fromSuperBrain.initSuperBrain(GConfig.superBrainUrl(), GConfig.superBrainAppKey(), GConfig.superBrainAppSecret()); /* 获取一次Token,这玩意好像不是必须的 */ if(!m_fromSuperBrain.getToken()) { SPDLOG_LOGGER_ERROR(m_logger, "FromSuperBrain线程获取Token失败"); return; } while (m_threadRunning) { SPDLOG_LOGGER_INFO(m_logger, "----------------- 刷新算法和设备信息 -----------------"); /* 先更新数据库的信息,防止从其他地方更改了数据库,这里没有刷新本地缓存 */ m_fromWebAPIUseSB.getAlgorithmInfo(m_vecEqmAlgInfo); m_fromWebAPIUseSB.getDeviceInfo(m_vecEqmDevInfo); m_fromWebAPIUseSB.getDeviceAlgorithmInfo(m_vecEqmDevInfo, m_listDevIDDelete); /* 从超脑获取基础信息 */ // m_fromSuperBrain.getTaskTypeList(vecAlgNewInfo); m_fromSuperBrain.getDeviceList(vecDevNewInfo); // SPDLOG_LOGGER_INFO(m_logger, "获取到的设备信息数量: {}", vecDevNewInfo.size()); /* 将设备返回的算法ID替换成带有后缀的全ID */ checkAlgorithmID(vecDevNewInfo); /* 处理算法信息,算法信息不需要在服务里处理了,在外部手动处理 */ // bool algIsUpdate = processAlgorithmInfo(vecAlgNewInfo); /* 处理设备信息 */ bool devIsUpdate = processDeviceInfo(vecDevNewInfo); if(!devIsUpdate) { SPDLOG_LOGGER_INFO(m_logger, "设备信息和算法信息没有更新"); } vecAlgNewInfo.clear(); vecDevNewInfo.clear(); /* 更新算法详细信息 */ m_mutexActionInfo.lock(); m_fromWebAPIUseSB.getActionInfo(m_listActionInfo); m_mutexActionInfo.unlock(); /* 20秒更新一次 */ std::this_thread::sleep_for(std::chrono::seconds(GVariable.CheckDeviceTime)); } SPDLOG_LOGGER_INFO(m_logger, "退出 fromSuperBrainThread 线程"); } /* 替换算法ID为全ID */ void SPAServer::checkAlgorithmID(std::vector& vecDevInfo) { /* 替换算法ID为全ID */ for(auto& dev : vecDevInfo) { for(auto act = dev.vecAlgorithmInfo.begin(); act != dev.vecAlgorithmInfo.end(); act++) { /* 替换算法ID为全ID */ std::string strFullID = GVariable.getFullActionID(act->ActionID); if(strFullID.empty()) { SPDLOG_LOGGER_INFO(m_logger, "未知的算法ID: {}", act->ActionID); /* 删除算法ID */ // act = dev.vecAlgorithmInfo.erase(act); }else { /* 替换算法ID */ act->ActionID = strFullID; } } } } /* 处理算法信息,返回值为true,说明有改变,需要重新读取 */ bool SPAServer::processAlgorithmInfo(std::vector vecNewAlgInfo) { std::vector vecAlgUpdate; std::vector vecAlgDelete; /* 对比数据库表格信息,这里只有插入和删除,没有更新 */ compareAlgorithmInfo(vecNewAlgInfo, vecAlgUpdate, vecAlgDelete); /* 更新数据库,先删除,再写入刷新 */ bool isUpdate = false; if(vecAlgDelete.size() > 0) { SPDLOG_LOGGER_DEBUG(m_logger, "删除算法信息"); m_fromWebAPIUseSB.deleteAlgorithmInfo(vecAlgDelete); isUpdate = true; } if(vecAlgUpdate.size() > 0) { SPDLOG_LOGGER_DEBUG(m_logger, "写入算法信息"); m_fromWebAPIUseSB.writeAlgorithmInfo(vecAlgUpdate); isUpdate = true; } return isUpdate; } /** * @brief 处理设备信息 * * @param vecNewDevInfo 传入新获取到的值 * @return true 需要重新读取数据库,获取新的数据 * @return false 无需读取数据库 */ bool SPAServer::processDeviceInfo(std::vector vecNewDevInfo) { std::vector vecDevInsert; std::vector vecDevUpdate; std::vector vecDevDelete; /*------------------------------------------------------------------------- ****** 这里只对比设备信息,不对比设备的算法信息,算法信息在下面单独对比 ******* *------------------------------------------------------------------------*/ /* 如果本地缓存没有数据,那么就全部插入 */ if(m_vecEqmDevInfo.size() > 0) { for(auto& DevInfo : vecNewDevInfo) { bool isExist = false; for(auto& it0 : m_vecEqmDevInfo) { if(DevInfo.DeviceID == it0.DeviceID) { isExist = true; /* 对比其他项是否相等,不相等就更新 */ if(DevInfo == it0) { continue; }else { vecDevUpdate.push_back(DevInfo); } break; } } if(!isExist) { vecDevInsert.push_back(DevInfo); } } }else { vecDevInsert = vecNewDevInfo; } /* 获取删除列表 */ if(vecNewDevInfo.size() > 0) { bool isExist = false; for(const auto& it : m_vecEqmDevInfo) { isExist = false; for(const auto& it0 : vecNewDevInfo) { if(it.DeviceID == it0.DeviceID) { isExist = true; break; } } if(!isExist) { vecDevDelete.push_back(it); } } }else { vecDevDelete = m_vecEqmDevInfo; } bool isUpdate = false; /* 先删除多余的数据 */ if(vecDevDelete.size() > 0) { if(m_fromWebAPIUseSB.deleteDeviceInfo(vecDevDelete)) { SPDLOG_LOGGER_DEBUG(m_logger,"删除设备信息到 tCamerInfo, 数量:{}",vecDevDelete.size()); isUpdate = true; } } /* 更新数据 */ if(vecDevUpdate.size() > 0) { if(m_fromWebAPIUseSB.updateDeviceInfo(vecDevUpdate)) { SPDLOG_LOGGER_DEBUG(m_logger, "更新设备信息到 tCamerInfo, 数量:{}", vecDevUpdate.size()); isUpdate = true; } } /* 插入数据 */ if(vecDevInsert.size() > 0) { if(m_fromWebAPIUseSB.insertDeviceInfo(vecDevInsert)) { SPDLOG_LOGGER_DEBUG(m_logger, "插入设备信息到 tCamerInfo, 数量:{}", vecDevInsert.size()); isUpdate = true; } } // for(auto& it : vecNewDevInfo) // { // for(auto& it0 : it.vecAlgorithmInfo) // { // SPDLOG_LOGGER_DEBUG(m_logger, "设备ID: {}, 算法ID: {}", it.DeviceID, it0.ActionID); // } // } /*------------------------------------------------------------------------- ************* 处理设备和算子关联的表格,单独对比设备的算法信息 ************* *------------------------------------------------------------------------*/ /* 插入新的设备信息 */ // if(vecDevInsert.size() > 0) // { // if(m_fromWebAPIUseSB.insertDeviceAlgorithmInfo(vecDevInsert)) // { // SPDLOG_LOGGER_DEBUG(m_logger, "插入设备和算法关联表(tActionCamer), 数量: {}", vecDevInsert.size()); // isUpdate = true; // } // } vecDevUpdate.clear(); /* 对比现有的设备是否需要更新算法 */ compareDeviceAlgorithmInfo(vecNewDevInfo, vecDevUpdate); if(vecDevUpdate.size() > 0) { if(m_fromWebAPIUseSB.updateDeviceAlgorithmInfo(vecDevUpdate)) { SPDLOG_LOGGER_DEBUG(m_logger, "更新设备和算法关联表(tActionCamer), 更新设备数目:{}", vecDevUpdate.size()); isUpdate = true; } } /* 删除tActionCamer表中消失的设备信息 */ if(m_listDevIDDelete.size() > 0) { if(m_fromWebAPIUseSB.deleteDeviceAlgorithmInfo(m_listDevIDDelete)) { SPDLOG_LOGGER_DEBUG(m_logger, "删除消失的设备关联的算法(tActionCamer), 数目: {}", m_listDevIDDelete.size()); isUpdate = true; } } return isUpdate; } /* 对比现有的数据和新获取到的数据,取出要删除和添加的数据 */ void SPAServer::compareAlgorithmInfo(const std::vector& vecNewInfo, std::vector& vecAlgUpdate, std::vector& vecAlgDelete) { /* 取出要添加的,如果本地缓存是0,那么全部都要添加 */ if(m_vecEqmAlgInfo.size() > 0) { for(const auto& it : vecNewInfo) { bool isExist = false; for(const auto& it0 : m_vecEqmAlgInfo) { /* 如果存在就退出循环 */ if(it.ActionID == it0.ActionID) { isExist = true; break; } } if(!isExist) { vecAlgUpdate.push_back(it); } } }else { vecAlgUpdate = vecNewInfo; } /* 取出要删除的,如果新的数据是0,那么全部都要删除 */ if(vecNewInfo.size() > 0) { bool isExist = false; for(const auto& it : m_vecEqmAlgInfo) { isExist = false; for(const auto& it0 : vecNewInfo) { if(it.ActionID == it0.ActionID) { isExist = true; break; } } if(!isExist) { vecAlgDelete.push_back(it); } } }else { vecAlgDelete = m_vecEqmAlgInfo; } } /** * @brief 对比设备和算法关联表是否需要更新 * 对比规则: * 1、这里只对比已有的设备ID,需要删除的ID在获取到tActionCamer表是就已经取出来了 * 2、如果设备ID相等,那么进一步对比算法信息是否相等 * 3、如果设备ID相等,但是算法信息数目不相等,那么直接加入更新列表 * 4、如果设备ID相等,算法信息数目相等,进一步对比算法信息 * * @param vecNewInfo * @param vecDevUpdate */ void SPAServer::compareDeviceAlgorithmInfo(const std::vector& vecNewInfo, std::vector& vecDevUpdate) { vecDevUpdate.clear(); for(const auto& it0 : vecNewInfo) { for(const auto& it1 : m_vecEqmDevInfo) { if(it0.DeviceID == it1.DeviceID) { /* 设备的算法信息数目不相等,直接加入更新列表 */ if(it0.vecAlgorithmInfo.size() != it1.vecAlgorithmInfo.size()) { vecDevUpdate.push_back(it0); break; } /* 设备的算法信息数目相等,进一步对比算法信息 */ bool isEquality = true; for(const auto& it2 : it0.vecAlgorithmInfo) { bool isEq2 = false; for(const auto& it3 : it1.vecAlgorithmInfo) { /* 这里只对比算法ID */ if(it2.ActionID != it3.ActionID) { continue; }else { isEq2 = true; break; } } if(!isEq2) { isEquality = false; break; } } if(!isEquality) { vecDevUpdate.push_back(it0); break; } } } } } /** * @brief 分派任务的线程 1、 线程定时刷新房间和摄像机的关联关系,以及和算法Action的关联关系 2、 将算法信息加入到不同的列表中 需要多个摄像机配合的加入到 m_runListRoomActionInfo 列表 不需要多个摄像机配合的加入到 m_runListActionInfo 列表 3、 每次刷新都会清空ActionInfo或者RoomActionInfo的摄像机列表,但是不会动其他基本信息,如果是列表中没有对应的信息, 就会创建新的ActionInfo,那么RunState的状态是INIT,需要开启新的线程 如果刷新完成后,算法对应的摄像机列表为空,那么对应的线程就会停止运行,将RunState设置为STOP,本线程的下一轮 循环就会删除这个ActionInfo */ void SPAServer::threadDistribution() { SPDLOG_LOGGER_INFO(m_logger, "**************** 开启分派任务线程 ****************"); /* 房间相机关联信息 */ std::list listRC; /* 存储获取到的应用和启用时间的信息 */ std::list listAppAndTime; /* 初始化WebAPI */ std::shared_ptr pFromWebAPI = std::make_shared(); pFromWebAPI->initWebApi(GConfig.webapiUrl(), GConfig.webapiKey(), GConfig.webapiAppType()); while (m_threadRunning) { /* ======================================================= */ /* 更新通道名称和摄像机名称信息 */ std::map mapChannelName; pFromWebAPI->getChannelInfo(mapChannelName); GConfig.setChannelInfo(mapChannelName); std::map mapCameraName; pFromWebAPI->getCameraInfo(mapCameraName); GConfig.setCameraInfo(mapCameraName); /* ======================================================= */ GThreadInfo.lockRunFAI(); /* 先清理已经退出的线程所用到的Action或者RoomAction */ GThreadInfo.clearNoneFuncThreadInfo(); /* 清空已经停止运行的功能类实例 */ clearNoneFuncThreadInfo(); /* ======================================================= */ /* 创建应用信息,根据从EQM数据库读取到的配置的应用信息创建 */ pFromWebAPI->getAlarmAppInfo(listAppAndTime); for(const auto& it : listAppAndTime) { /* 创建应用信息,如果已有该应用,就更新时间 * 这里只创建应用信息块,没有对应的房间、算法信息 */ GThreadInfo.addFuncThreadInfo(it); } /* 设置应用对应的频率信息 */ for(auto& func : GThreadInfo.getList()) { /* 设置频率名称 */ func->strChannelName = GConfig.getChannelName(func->ChannelID); } /* 设置对应的摄像机名称列表 */ GThreadInfo.setCameraInfo(GConfig.getCameraInfoMap()); /* 先获取EQM数据库信息,取出房间和摄像机关联信息,包括所在的频率 */ m_mutexActionInfo.lock(); pFromWebAPI->getActionInfo(m_listActionInfo); /* 将算法信息加入到不同的功能列表中,先清空功能对应的算法设备列表 */ GThreadInfo.clearActionList(); for(const auto& it : m_listActionInfo.getData()) { GThreadInfo.addActionInfo(*it); } /* 检查算法信息的状态,频率里的应用没有配置摄像机就设置为线程运行状态为停止,退出该线程 */ GThreadInfo.setNoneCameraFuncStop(); m_mutexActionInfo.unlock(); /* 开启线程 */ for(const auto& func : GThreadInfo.getList()) { if(func->RunState == RunTimeState::RUN_STATE_INIT) { /* 创建实例 */ FuncBase* pFunc = createFuncInstance(*func); if(pFunc == nullptr) { SPDLOG_LOGGER_ERROR(m_logger, "创建功能实例失败:{}", func->strFunctionName); continue; } CPPTP.add_task(&FuncBase::thread_task, pFunc); m_listFuncBase.push_back(pFunc); } } GThreadInfo.unlockRunFAI(); /* 休眠n秒,默认应该是300秒 */ std::this_thread::sleep_for(std::chrono::seconds(GVariable.CheckSet)); } SPDLOG_LOGGER_INFO(m_logger, "分派任务线程退出"); } /** * @brief 创建任务实例的函数,根据不同的功能创建出不同的实例 * * @param info * @return FuncBase* */ FuncBase* SPAServer::createFuncInstance(FuncThreadInfo& info) { if(info.appFunction == AppFunction::APP_NONE) { return nullptr; } FuncBase* pFunc = nullptr; /* 普通的功能,只用一个摄像机一个算法的功能 */ if( info.appFunction == AppFunction::APP_AllDown || info.appFunction == AppFunction::APP_NoMask || info.appFunction == AppFunction::APP_PlayPhone || info.appFunction == AppFunction::APP_Mouse || info.appFunction == AppFunction::APP_Fatigue || info.appFunction == AppFunction::APP_Contraband ) { auto tmpFunc = new FuncOrdinary(); tmpFunc->setFuncThreadInfo(info); pFunc = tmpFunc; } else if(info.appFunction == AppFunction::APP_OnWork) { /* 人员在岗检测 */ auto tmpFunc = new FuncOnAndOffJob(); tmpFunc->setFuncThreadInfo(info); pFunc = tmpFunc; } else if(info.appFunction == AppFunction::APP_Illegal) { /* 非法入侵 */ auto tmpFunc = new FuncIllegalInvasion(); tmpFunc->setFuncThreadInfo(info); pFunc = tmpFunc; } else if(info.appFunction == AppFunction::APP_Regional) { /* 区域人员检测 */ auto tmpFunc = new FuncRegionalPersonCount(); tmpFunc->setFuncThreadInfo(info); pFunc = tmpFunc; } return pFunc; } /* 清理没有在运行的线程实例 */ void SPAServer::clearNoneFuncThreadInfo() { for(auto it = m_listFuncBase.begin(); it != m_listFuncBase.end(); ) { if(!(*it)->getThreadRunning()) { /* 需要转换成相应的子类实例才能删除 */ if( (*it)->getApp() == AppFunction::APP_AllDown || (*it)->getApp() == AppFunction::APP_NoMask || (*it)->getApp() == AppFunction::APP_PlayPhone || (*it)->getApp() == AppFunction::APP_Mouse || (*it)->getApp() == AppFunction::APP_Fatigue || (*it)->getApp() == AppFunction::APP_Contraband ) { FuncOrdinary* p = dynamic_cast(*it); delete p; } else if((*it)->getApp() == AppFunction::APP_OnWork) { /* 人员在岗检测 */ FuncOnAndOffJob* p = dynamic_cast(*it); delete p; } else if((*it)->getApp() == AppFunction::APP_Illegal) { /* 非法入侵 */ FuncIllegalInvasion* p = dynamic_cast(*it); delete p; } else if((*it)->getApp() == AppFunction::APP_Regional) { /* 区域人员检测 */ FuncRegionalPersonCount* p = dynamic_cast(*it); delete p; } it = m_listFuncBase.erase(it); }else { ++it; } } }