| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339 | #include "ThreadPool.h"// #include "spdlog/spdlog.h"#include "StdLog/stdlog.h"/** * @brief Construct a new Thread Pool:: Thread Pool object *        构造函数,从这里创建线程,和Linux C的线程池不同,Linux C的线程池数组管理的是线程ID, *        而C++的线程数组直接存储的就是线程函数体,里面有个function的变量,指向任务队列中任务函数 *  * @param numThread  */ThreadPool::ThreadPool() :     m_stop(false){    /* 初始化变量 */    // m_threadMaxNum = std::thread::hardware_concurrency();   /* 根据CPU核心数规定线程数目 */    m_threadMaxNum = 256;    m_threadMiniNum = 2;    m_threadAddNum = 2;    m_threadMiniIdle = 1;    m_threadMaxIdle = 4;    m_threadRunNum = 0;    m_threadLiveNum = 0;    m_threadExitNum = 0;    /* 创建管理线程,this表示是这个类的成员函数 */    m_managerThread = std::thread(&ThreadPool::managerThread, this);    LOG_DEBUG("***** Hello ThreadPool *****");    // /* 创建初始的numThread个线程 */    // createThread(numThread);}/* 析构函数 */ThreadPool::~ThreadPool(){    LOG_INFO("线程池正在退出...");    /* 将stop置为true */    {        std::unique_lock<std::mutex> lock(m_mutexTask);        m_stop = true;    }    LOG_INFO("通知所有工作线程退出...");    /* 发送条件变量,通知所有线程 */    m_cond_Task.notify_all();    /* 等待所有的线程退出并回收完成 */    while (!m_mapThreads.empty())    {        /* 管理线程自己退出了,所以要手动清空容器 */        clearThread();        std::this_thread::sleep_for(std::chrono::microseconds(100));    }    LOG_INFO("回收管理线程...");    /* 先回收管理线程 */    m_managerThread.join();    LOG_INFO("===== 线程池退出完成 =====");}/* 工作线程函数 */void ThreadPool::worker(){    m_threadLiveNum++;    while (true)    {        /* 等待任务队列中有任务 */        std::unique_lock<std::mutex> lock(m_mutexTask);        /* 这里的wait第二个参数是lamba表达式,被唤醒后再次检查条件是否满足 */        m_cond_Task.wait(lock, [this]                               { return !m_queue_Tasks.empty() || m_stop ||                                        (m_threadExitNum > 0); });        /* 任务队列中有任务了,条件变量被唤醒了,先判断是不是需要结束线程 */        if (m_stop && m_queue_Tasks.empty())        {            break;        }        /* 判断是不是需要销毁多余的线程 */        if (m_threadExitNum.load() > 0 )        {            m_threadExitNum--;            /* 再次判断有没有新任务,有就不退出 */            if ( m_queue_Tasks.empty())            {                break;            }        }        /* 取出任务,执行任务 */        std::function<void()> task(std::move(m_queue_Tasks.front()));        m_queue_Tasks.pop();  /* 取出的任务出队 */        lock.unlock();              /* 解锁任务队列 */        m_threadRunNum++;     /* 更新线程状态数 */        /* 开始执行任务 */        task();        m_threadRunNum--;         /* 更新线程状态数 */    }    /* 线程结束 */    m_threadLiveNum--;    /* 将自身ID加入到退出列表中 */    {        std::unique_lock<std::mutex> lock(m_mutexExitThreadID);        m_exitThreadID.emplace_back(std::this_thread::get_id());    }    /* 使用流将线程ID转换成字符串,不然fmt无法打印     * 这里通过hash转换成了size_t */    // std::stringstream ss;    // ss << std::this_thread::get_id();    auto threadID = std::this_thread::get_id();    std::hash<std::thread::id> hasher;    size_t id = hasher(threadID);    LOG_DEBUG("线程ID: " << id << " 退出任务循环");    return;}/** * @brief 管理者线程,维护线程创建或死亡 *  */void ThreadPool::managerThread(){    while (!m_stop)    {        /* 获取空闲线程的个数 */        int num_idle = m_threadLiveNum.load() - m_threadRunNum.load();        /* 判断线程是否够用,是否需要创建新的线程 */        // LOG_DEBUG("***** 判断是否需要添加线程 *****");        if ((num_idle < m_threadMiniIdle.load()) && (m_threadLiveNum.load() < m_threadMaxNum) )        {            std::unique_lock<std::mutex> lock(m_mutexTask);            int numTask = (int)m_queue_Tasks.size();            /* 获取任务队列中的任务个数 */            lock.unlock();            int numAdd = 0;            /* 重新修改增加数目,正常情况下增加 numTask + m_threadMiniIdle - num_idle 数目,达到最低空闲线程个数,               如果二者之和超过了最大线程数,就添加可以添加的最多数目 */            // if(numTask > 0)            // {                // /* 任务数 + 存在的线程个数是否大于最大线程数 */                // if( numTask + m_threadLiveNum.load() <= m_threadMaxNum )                // {                //     /* 创建numTask个线程 */                //     numAdd = numTask;                // }                // /* 默认添加的个数 + 存在的线程数是否大于最大线程数 */                // else if ( (m_threadAddNum.load() + m_threadLiveNum.load()) <= m_threadMaxNum)                // {                //     /* 创建m_threadAddNum个线程 */                //     numAdd = m_threadAddNum.load();                // }                // /* 能添加几个线程就添加几个线程 */                // else                // {                //     numAdd = m_threadMaxNum - m_threadLiveNum.load();                // }            // } else            // {            //     /* 没有新任务,但是空闲线程数低于设置的最小空闲线程数 */            //     numAdd = m_threadMiniIdle.load() - num_idle;            // }            numAdd = numTask + m_threadMiniIdle.load() - num_idle;            if(numAdd < 0)            {                numAdd = 0;   /* 如果计算出来的添加数目小于0,就不添加 */            }            /* 如果添加的线程数目超过了最大线程数,就添加可以添加的最多数目 */            if (numAdd + m_threadLiveNum.load() > m_threadMaxNum)            {                numAdd = m_threadMaxNum - m_threadLiveNum.load();            }                        if(numAdd > 0)            {                // LOG_INFO("需要添加{}个线程", numAdd);                createThread(numAdd);                continue;   /* 直接下一个循环,无需检查需不需要销毁线程 */            }                    }        if(m_threadLiveNum .load() >= m_threadMaxNum)        {            LOG_DEBUG("线程池中现存的线程个数: " << m_threadLiveNum.load() << ",超过最大线程数: " << m_threadMaxNum);        }        /* 判断空闲线程是否过多,是否需要销毁几个线程 */        // LOG_DEBUG("***** 判断是否需要销毁线程 *****");        /* 由于没规定每次销毁的线程个数,所以这里使用m_threadAddNum作为每次销毁的标准个数 */        if (num_idle > m_threadMaxIdle.load())        {            int num_Exit = num_idle = m_threadMaxIdle.load();            if (num_Exit > m_threadAddNum.load())            {                num_Exit = m_threadAddNum.load();            }            m_threadExitNum.exchange(num_Exit);            LOG_DEBUG("有" << m_threadExitNum.load() << "个线程需要退出");            /* 唤醒需要退出的num_idle个线程 */            for (int i = 0; i < num_Exit; i++)            {                m_cond_Task.notify_one();            }        }        /* 回收退出的线程 */        clearThread();        // LOG_INFO("线程池中的线程实例个数:{}", m_threads.size());        std::this_thread::sleep_for(std::chrono::seconds(1));    }    LOG_INFO("管理线程退出...");}/** * @brief 创建新的线程 *        注意:这里只能使用lambda表达式,或者将do_work变成全局函数,emplace_back会调用thread构造函数将lambda表达式构造成一个std::thread实例 *        lambda表达式里是子线程,外面是主线程 *  */void ThreadPool::createThread(int num){    if(num <=0 )    {        LOG_WARN("创建线程的个数不能小于等于0");        return;    }    for (int i = 0; i < num; i++)    {        /* 创建线程,传入工作函数 */        std::thread t(&ThreadPool::worker, this);        m_mapThreads.insert(std::make_pair( t.get_id(), std::move(t) ));    }}/** * @brief 删除线程池中失效的线程实例,使用递归的方法遍历全部 *  */void ThreadPool::clearThread(){    for(auto& it : m_exitThreadID)    {        auto it1 = m_mapThreads.find(it);        if(it1 != m_mapThreads.end())        {            if(it1->second.joinable())            {                it1->second.join();                m_mapThreads.erase(it1);            }        }    }    m_exitThreadID.clear();}/* 获取线程池最大线程的个数 */int ThreadPool::getThreadMaxNum(){    return m_threadMaxNum;}/* 设置线程池最大线程的个数 */void ThreadPool::setThreadMaxNum(int num){    m_threadMaxNum = num;}/* 获取线程池最大线程的个数 */int ThreadPool::getThreadMiniNum(){    return m_threadMiniNum;}/* 设置线程池最大线程的个数 */void ThreadPool::setThreadMiniNum(int num){    m_threadMiniNum = num;}/* 获取线程池空闲线程的个数 */int ThreadPool::getThreadIdleNum(){    return m_threadLiveNum.load() - m_threadRunNum.load();}/* 获取线程池正在运行的线程个数 */int ThreadPool::getThreadRunNum(){    return m_threadRunNum.load();} /* 获取线程池现存的线程个数 */int ThreadPool::getThreadLiveNum(){    return m_threadLiveNum.load();}/* 线程池每次创建线程的个数 */int ThreadPool::getThreadAddNum(){    return m_threadAddNum.load();}/* 设置线程池每次创建线程的个数 */void ThreadPool::setThreadAddNum(int num){    m_threadAddNum.exchange(num);}/* 线程池最小空闲线程的个数 */int ThreadPool::getThreadMiniIdle(){    return m_threadMiniIdle.load();}/* 设置线程池最小空闲线程的个数 */void ThreadPool::setThreadMiniIdle(int num){    m_threadMiniIdle.exchange(num);}/* 线程池最大空闲线程的个数 */int ThreadPool::getThreadMaxIdle(){    return m_threadMaxIdle.load();}/* 设置线程池最大空闲线程的个数 */void ThreadPool::setThreadMaxIdle(int num){    m_threadMaxIdle.exchange(num);}
 |