#include "ThreadPool.h" // #include "spdlog/spdlog.h" #include "StdLog/stdlog.h" /** * @brief Construct a new Thread Pool:: Thread Pool object * 构造函数,从这里创建线程,和Linux C的线程池不同,Linux C的线程池数组管理的是线程ID, * 而C++的线程数组直接存储的就是线程函数体,里面有个function的变量,指向任务队列中任务函数 * * @param numThread */ ThreadPool::ThreadPool() : m_stop(false) { /* 初始化变量 */ // m_threadMaxNum = std::thread::hardware_concurrency(); /* 根据CPU核心数规定线程数目 */ m_threadMaxNum = 256; m_threadMiniNum = 2; m_threadAddNum = 2; m_threadMiniIdle = 1; m_threadMaxIdle = 4; m_threadRunNum = 0; m_threadLiveNum = 0; m_threadExitNum = 0; /* 创建管理线程,this表示是这个类的成员函数 */ m_managerThread = std::thread(&ThreadPool::managerThread, this); LOG_DEBUG("***** Hello ThreadPool *****"); // /* 创建初始的numThread个线程 */ // createThread(numThread); } /* 析构函数 */ ThreadPool::~ThreadPool() { LOG_INFO("线程池正在退出..."); /* 将stop置为true */ { std::unique_lock lock(m_mutexTask); m_stop = true; } LOG_INFO("通知所有工作线程退出..."); /* 发送条件变量,通知所有线程 */ m_cond_Task.notify_all(); /* 等待所有的线程退出并回收完成 */ while (!m_mapThreads.empty()) { /* 管理线程自己退出了,所以要手动清空容器 */ clearThread(); std::this_thread::sleep_for(std::chrono::microseconds(100)); } LOG_INFO("回收管理线程..."); /* 先回收管理线程 */ m_managerThread.join(); LOG_INFO("===== 线程池退出完成 ====="); } /* 工作线程函数 */ void ThreadPool::worker() { m_threadLiveNum++; while (true) { /* 等待任务队列中有任务 */ std::unique_lock lock(m_mutexTask); /* 这里的wait第二个参数是lamba表达式,被唤醒后再次检查条件是否满足 */ m_cond_Task.wait(lock, [this] { return !m_queue_Tasks.empty() || m_stop || (m_threadExitNum > 0); }); /* 任务队列中有任务了,条件变量被唤醒了,先判断是不是需要结束线程 */ if (m_stop && m_queue_Tasks.empty()) { break; } /* 判断是不是需要销毁多余的线程 */ if (m_threadExitNum.load() > 0 ) { m_threadExitNum--; /* 再次判断有没有新任务,有就不退出 */ if ( m_queue_Tasks.empty()) { break; } } /* 取出任务,执行任务 */ std::function task(std::move(m_queue_Tasks.front())); m_queue_Tasks.pop(); /* 取出的任务出队 */ lock.unlock(); /* 解锁任务队列 */ m_threadRunNum++; /* 更新线程状态数 */ /* 开始执行任务 */ task(); m_threadRunNum--; /* 更新线程状态数 */ } /* 线程结束 */ m_threadLiveNum--; /* 将自身ID加入到退出列表中 */ { std::unique_lock lock(m_mutexExitThreadID); m_exitThreadID.emplace_back(std::this_thread::get_id()); } /* 使用流将线程ID转换成字符串,不然fmt无法打印 * 这里通过hash转换成了size_t */ // std::stringstream ss; // ss << std::this_thread::get_id(); auto threadID = std::this_thread::get_id(); std::hash hasher; size_t id = hasher(threadID); LOG_DEBUG("线程ID: " << id << " 退出任务循环"); return; } /** * @brief 管理者线程,维护线程创建或死亡 * */ void ThreadPool::managerThread() { while (!m_stop) { /* 获取空闲线程的个数 */ int num_idle = m_threadLiveNum.load() - m_threadRunNum.load(); /* 判断线程是否够用,是否需要创建新的线程 */ // LOG_DEBUG("***** 判断是否需要添加线程 *****"); if ((num_idle < m_threadMiniIdle.load()) && (m_threadLiveNum.load() < m_threadMaxNum) ) { std::unique_lock lock(m_mutexTask); int numTask = (int)m_queue_Tasks.size(); /* 获取任务队列中的任务个数 */ lock.unlock(); int numAdd = 0; if(numTask > 0) { /* 任务数 + 存在的线程个数是否大于最大线程数 */ if( numTask + m_threadLiveNum.load() <= m_threadMaxNum ) { /* 创建numTask个线程 */ numAdd = numTask; } /* 默认添加的个数 + 存在的线程数是否大于最大线程数 */ else if ( (m_threadAddNum.load() + m_threadLiveNum.load()) <= m_threadMaxNum) { /* 创建m_threadAddNum个线程 */ numAdd = m_threadAddNum.load(); } /* 能添加几个线程就添加几个线程 */ else { numAdd = m_threadMaxNum - m_threadLiveNum.load(); } } /* 空闲线程数低于设置的最小空闲线程数 */ else { numAdd = m_threadMiniIdle.load() - num_idle; } if(numAdd > 0) { // LOG_INFO("需要添加{}个线程", numAdd); createThread(numAdd); continue; /* 直接下一个循环,无需检查需不需要销毁线程 */ } } /* 判断空闲线程是否过多,是否需要销毁几个线程 */ // LOG_DEBUG("***** 判断是否需要销毁线程 *****"); /* 由于没规定每次销毁的线程个数,所以这里使用m_threadAddNum作为每次销毁的标准个数 */ if (num_idle > m_threadMaxIdle.load()) { int num_Exit = num_idle = m_threadMaxIdle.load(); if (num_Exit > m_threadAddNum.load()) { num_Exit = m_threadAddNum.load(); } m_threadExitNum.exchange(num_Exit); LOG_INFO("有" << m_threadExitNum.load() << "个线程需要退出"); /* 唤醒需要退出的num_idle个线程 */ for (int i = 0; i < num_Exit; i++) { m_cond_Task.notify_one(); } } /* 回收退出的线程 */ clearThread(); // LOG_INFO("线程池中的线程实例个数:{}", m_threads.size()); std::this_thread::sleep_for(std::chrono::seconds(1)); } LOG_INFO("管理线程退出..."); } /** * @brief 创建新的线程 * 注意:这里只能使用lambda表达式,或者将do_work变成全局函数,emplace_back会调用thread构造函数将lambda表达式构造成一个std::thread实例 * lambda表达式里是子线程,外面是主线程 * */ void ThreadPool::createThread(int num) { for (int i = 0; i < num; i++) { /* 创建线程,传入工作函数 */ std::thread t(&ThreadPool::worker, this); m_mapThreads.insert(std::make_pair( t.get_id(), std::move(t) )); } } /** * @brief 删除线程池中失效的线程实例,使用递归的方法遍历全部 * */ void ThreadPool::clearThread() { for(auto& it : m_exitThreadID) { auto it1 = m_mapThreads.find(it); if(it1 != m_mapThreads.end()) { if(it1->second.joinable()) { it1->second.join(); m_mapThreads.erase(it1); } } } m_exitThreadID.clear(); } /* 获取线程池最大线程的个数 */ int ThreadPool::getThreadMaxNum() { return m_threadMaxNum; } /* 设置线程池最大线程的个数 */ void ThreadPool::setThreadMaxNum(int num) { m_threadMaxNum = num; } /* 获取线程池最大线程的个数 */ int ThreadPool::getThreadMiniNum() { return m_threadMiniNum; } /* 设置线程池最大线程的个数 */ void ThreadPool::setThreadMiniNum(int num) { m_threadMiniNum = num; } /* 获取线程池空闲线程的个数 */ int ThreadPool::getThreadIdleNum() { return m_threadLiveNum.load() - m_threadRunNum.load(); } /* 获取线程池正在运行的线程个数 */ int ThreadPool::getThreadRunNum() { return m_threadRunNum.load(); } /* 获取线程池现存的线程个数 */ int ThreadPool::getThreadLiveNum() { return m_threadLiveNum.load(); } /* 线程池每次创建线程的个数 */ int ThreadPool::getThreadAddNum() { return m_threadAddNum.load(); } /* 设置线程池每次创建线程的个数 */ void ThreadPool::setThreadAddNum(int num) { m_threadAddNum.exchange(num); } /* 线程池最小空闲线程的个数 */ int ThreadPool::getThreadMiniIdle() { return m_threadMiniIdle.load(); } /* 设置线程池最小空闲线程的个数 */ void ThreadPool::setThreadMiniIdle(int num) { m_threadMiniIdle.exchange(num); } /* 线程池最大空闲线程的个数 */ int ThreadPool::getThreadMaxIdle() { return m_threadMaxIdle.load(); } /* 设置线程池最大空闲线程的个数 */ void ThreadPool::setThreadMaxIdle(int num) { m_threadMaxIdle.exchange(num); }