|
@@ -1,5 +1,7 @@
|
|
|
#include "ThreadPool.h"
|
|
|
-#include <iterator>
|
|
|
+
|
|
|
+#include "spdlog/spdlog.h"
|
|
|
+#include "fmt/std.h"
|
|
|
|
|
|
/**
|
|
|
* @brief Construct a new Thread Pool:: Thread Pool object
|
|
@@ -8,24 +10,25 @@
|
|
|
*
|
|
|
* @param numThread
|
|
|
*/
|
|
|
-ThreadPool::ThreadPool():m_stop(false)
|
|
|
+ThreadPool::ThreadPool() :
|
|
|
+ m_stop(false)
|
|
|
{
|
|
|
|
|
|
/* 初始化变量 */
|
|
|
- m_threadMaxNum = 256;
|
|
|
+ m_threadMaxNum = std::thread::hardware_concurrency(); /* 根据CPU核心数规定线程数目 */
|
|
|
m_threadMiniNum = 3;
|
|
|
m_threadLiveNum = 0;
|
|
|
- m_threadMaxNum = 256;
|
|
|
m_threadExitNum = 0;
|
|
|
m_threadAddNum = 5;
|
|
|
m_threadMiniIdle = 2;
|
|
|
m_threadMaxIdle = 6;
|
|
|
m_threadRunNum = 0;
|
|
|
|
|
|
- /* 创建管理线程 */
|
|
|
- SPDLOG_DEBUG("***** 创建管理线程 *****");
|
|
|
+ /* 创建管理线程,this表示是这个类的成员函数 */
|
|
|
m_managerThread = std::thread(&ThreadPool::managerThread, this);
|
|
|
|
|
|
+ SPDLOG_DEBUG("***** Hello ThreadPool *****");
|
|
|
+
|
|
|
// /* 创建初始的numThread个线程 */
|
|
|
// createThread(numThread);
|
|
|
}
|
|
@@ -38,44 +41,77 @@ ThreadPool::~ThreadPool()
|
|
|
std::unique_lock<std::mutex> lock(m_mutexTask);
|
|
|
m_stop = true;
|
|
|
}
|
|
|
- SPDLOG_INFO("回收管理线程...");
|
|
|
- /* 先回收管理线程 */
|
|
|
- m_managerThread.join();
|
|
|
|
|
|
SPDLOG_INFO("通知所有工作线程退出...");
|
|
|
/* 发送条件变量,通知所有线程 */
|
|
|
m_cond_Task.notify_all();
|
|
|
+ /* 等待所有的线程退出并回收完成 */
|
|
|
+ while (!m_mapThreads.empty())
|
|
|
+ {
|
|
|
+ /* 管理线程自己退出了,所以要手动清空容器 */
|
|
|
+ clearThread();
|
|
|
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
|
|
+ }
|
|
|
+
|
|
|
+ SPDLOG_INFO("回收管理线程...");
|
|
|
+ /* 先回收管理线程 */
|
|
|
+ m_managerThread.join();
|
|
|
+ SPDLOG_INFO("===== 线程池退出完成 =====");
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+/* 工作线程函数 */
|
|
|
+void ThreadPool::worker()
|
|
|
+{
|
|
|
+ m_threadLiveNum++;
|
|
|
+ while (true)
|
|
|
+ {
|
|
|
+ /* 等待任务队列中有任务 */
|
|
|
+ std::unique_lock<std::mutex> lock(m_mutexTask);
|
|
|
+ /* 这里的wait第二个参数是lamba表达式,在唤醒后会进行判断是否满足条件 */
|
|
|
+ m_cond_Task.wait(lock, [this]
|
|
|
+ { return !m_queue_Tasks.empty() || m_stop ||
|
|
|
+ (m_threadExitNum > 0); });
|
|
|
+ /* 任务队列中有任务了,条件变量被唤醒了,先判断是不是需要结束线程 */
|
|
|
+ if (m_stop && m_queue_Tasks.empty())
|
|
|
+ {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ /* 判断是不是需要销毁多余的线程 */
|
|
|
+ if (m_threadExitNum.load() > 0 )
|
|
|
+ {
|
|
|
+ m_threadExitNum--;
|
|
|
+ /* 再次判断有没有新任务,有就不退出 */
|
|
|
+ if ( m_queue_Tasks.empty())
|
|
|
+ {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /* 取出任务,执行任务 */
|
|
|
+ std::function<void()> task(std::move(m_queue_Tasks.front()));
|
|
|
+ m_queue_Tasks.pop(); /* 取出的任务出队 */
|
|
|
+ lock.unlock(); /* 解锁任务队列 */
|
|
|
+ m_threadRunNum++; /* 更新线程状态数 */
|
|
|
+ /* 开始执行任务 */
|
|
|
+ task();
|
|
|
+ m_threadRunNum--; /* 更新线程状态数 */
|
|
|
+ }
|
|
|
|
|
|
- /* 注意,这里不需要回收线程资源了,都在类中回收完了 */
|
|
|
- /* 使用jion回收线程资源 */
|
|
|
- // for (auto& th : m_threads)
|
|
|
- // {
|
|
|
- // th.join();
|
|
|
- // }
|
|
|
+ /* 线程结束 */
|
|
|
+ m_threadLiveNum--;
|
|
|
+ /* 将自身ID加入到退出列表中 */
|
|
|
+ {
|
|
|
+ std::unique_lock<std::mutex> lock(m_mutexExitThreadID);
|
|
|
+ m_exitThreadID.emplace_back(std::this_thread::get_id());
|
|
|
+ }
|
|
|
+ /* 使用流将线程ID转换成字符串,不然fmt无法打印 */
|
|
|
+ std::stringstream ss;
|
|
|
+ ss << std::this_thread::get_id();
|
|
|
+ SPDLOG_DEBUG("线程ID:{}退出任务循环", ss.str());
|
|
|
|
|
|
+ return;
|
|
|
}
|
|
|
|
|
|
-/**
|
|
|
- * @brief 向任务队列添加任务函数
|
|
|
- *
|
|
|
- * @tparam F 函数(指针?)
|
|
|
- * @tparam Args 参数包
|
|
|
- * @param f 万能引用
|
|
|
- * @param args 万能引用
|
|
|
- */
|
|
|
-// template <typename F, typename... Args>
|
|
|
-// void ThreadPool::add_Task(F &&f, Args &&...args)
|
|
|
-// {
|
|
|
-// /* 将函数参数和函数绑定 */
|
|
|
-// std::function<void()> task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
|
|
|
-// /* 作用域是用于解锁lock */
|
|
|
-// {
|
|
|
-// std::unique_lock<std::mutex> lock(m_mutexTask);
|
|
|
-// m_queue_Tasks.emplace(std::move(task)); /* 入队 */
|
|
|
-// }
|
|
|
-// /* 唤醒一个线程 */
|
|
|
-// m_cond_Task.notify_one();
|
|
|
-// }
|
|
|
|
|
|
/**
|
|
|
* @brief 管理者线程,维护线程创建或死亡
|
|
@@ -89,33 +125,45 @@ void ThreadPool::managerThread()
|
|
|
int num_idle = m_threadLiveNum.load() - m_threadRunNum.load();
|
|
|
/* 判断线程是否够用,是否需要创建新的线程 */
|
|
|
// SPDLOG_DEBUG("***** 判断是否需要添加线程 *****");
|
|
|
- if ((num_idle < m_threadMiniIdle.load()) && (m_threadLiveNum.load() < m_threadMaxNum))
|
|
|
+ if ((num_idle < m_threadMiniIdle.load()) && (m_threadLiveNum.load() < m_threadMaxNum) )
|
|
|
{
|
|
|
std::unique_lock<std::mutex> lock(m_mutexTask);
|
|
|
int numTask = (int)m_queue_Tasks.size(); /* 获取任务队列中的任务个数 */
|
|
|
lock.unlock();
|
|
|
int numAdd = 0;
|
|
|
- if (m_threadAddNum.load() + m_threadLiveNum.load() > m_threadMaxNum)
|
|
|
- {
|
|
|
- numAdd = m_threadMaxNum - m_threadLiveNum.load();
|
|
|
- }
|
|
|
- else if (numTask > m_threadAddNum.load()) /* 判断一次性需要添加多少线程 */
|
|
|
+ if(numTask > 0)
|
|
|
{
|
|
|
- if(numTask + m_threadLiveNum.load() < m_threadMaxNum)
|
|
|
+ /* 任务数 + 存在的线程个数是否大于最大线程数 */
|
|
|
+ if( numTask + m_threadLiveNum.load() <= m_threadMaxNum )
|
|
|
{
|
|
|
/* 创建numTask个线程 */
|
|
|
numAdd = numTask;
|
|
|
}
|
|
|
+ /* 默认添加的个数 + 存在的线程数是否大于最大线程数 */
|
|
|
+ else if ( (m_threadAddNum.load() + m_threadLiveNum.load()) <= m_threadMaxNum)
|
|
|
+ {
|
|
|
+ /* 创建m_threadAddNum个线程 */
|
|
|
+ numAdd = m_threadAddNum.load();
|
|
|
+ }
|
|
|
+ /* 能添加几个线程就添加几个线程 */
|
|
|
+ else
|
|
|
+ {
|
|
|
+ numAdd = m_threadMaxNum - m_threadLiveNum.load();
|
|
|
+ }
|
|
|
}
|
|
|
+ /* 空闲线程数低于设置的最小空闲线程数 */
|
|
|
else
|
|
|
{
|
|
|
- /* 创建m_threadAddNum个线程 */
|
|
|
- numAdd = m_threadAddNum.load();
|
|
|
+ numAdd = m_threadMiniIdle.load() - num_idle;
|
|
|
}
|
|
|
- SPDLOG_INFO("需要添加{}个线程", numAdd);
|
|
|
- createThread(numAdd);
|
|
|
-
|
|
|
- continue; /* 直接下一个循环,无需检查需不需要销毁线程 */
|
|
|
+
|
|
|
+ if(numAdd > 0)
|
|
|
+ {
|
|
|
+ SPDLOG_INFO("需要添加{}个线程", numAdd);
|
|
|
+ createThread(numAdd);
|
|
|
+ continue; /* 直接下一个循环,无需检查需不需要销毁线程 */
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
|
|
|
/* 判断空闲线程是否过多,是否需要销毁几个线程 */
|
|
@@ -143,7 +191,7 @@ void ThreadPool::managerThread()
|
|
|
// SPDLOG_INFO("线程池中的线程实例个数:{}", m_threads.size());
|
|
|
std::this_thread::sleep_for(std::chrono::seconds(1));
|
|
|
}
|
|
|
- return;
|
|
|
+ SPDLOG_INFO("管理线程退出...");
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -156,51 +204,9 @@ void ThreadPool::createThread(int num)
|
|
|
{
|
|
|
for (int i = 0; i < num; i++)
|
|
|
{
|
|
|
-
|
|
|
- m_threads.emplace_back(this);
|
|
|
-
|
|
|
- /* 向线程数组中插入线程函数 */
|
|
|
- // m_threads.emplace_back(std::move(std::thread(do_work))); //错误,线程不能调用成员函数
|
|
|
- /* 这里只能使用lambda表达式,或者将do_work变成全局函数,emplace_back会调用thread构造函数将lambda表达式构造成一个std::thread实例 */
|
|
|
- // m_threads.emplace_back([this]()
|
|
|
- // {
|
|
|
- // this->m_threadExistNum++;
|
|
|
- // while (1)
|
|
|
- // {
|
|
|
- // /* 等待任务队列中有任务 */
|
|
|
- // std::unique_lock<std::mutex> lock(m_mutexTask);
|
|
|
- // m_cond_Task.wait(lock, [this]
|
|
|
- // { return !m_queue_Tasks.empty() || m_stop; });
|
|
|
- // /* 任务队列中有任务了,条件变量被唤醒了,先判断是不是需要结束线程 */
|
|
|
- // if (m_stop && m_queue_Tasks.empty())
|
|
|
- // {
|
|
|
- // break;
|
|
|
- // }
|
|
|
- // /* 判断是不是需要销毁多余的线程 */
|
|
|
- // if (m_threadExitNum > 0 )
|
|
|
- // {
|
|
|
- // this->m_threadExitNum--;
|
|
|
- // /* 再次判断有没有新任务,有就不退出 */
|
|
|
- // if ( m_queue_Tasks.empty())
|
|
|
- // {
|
|
|
-
|
|
|
- // break;
|
|
|
- // }
|
|
|
- // }
|
|
|
- // /* 取出任务,执行任务 */
|
|
|
- // std::function<void()> task(std::move(m_queue_Tasks.front()));
|
|
|
- // m_queue_Tasks.pop(); /* 取出的任务出队 */
|
|
|
- // lock.unlock(); /* 解锁任务队列 */
|
|
|
-
|
|
|
- // this->m_threadRunNum++; /* 更新线程状态数 */
|
|
|
- // /* 开始执行任务 */
|
|
|
- // task();
|
|
|
-
|
|
|
- // this->m_threadRunNum--; /* 更新线程状态数 */
|
|
|
- // }
|
|
|
- // this->m_threadExistNum--;
|
|
|
- // return;
|
|
|
- // });
|
|
|
+ /* 创建线程,传入工作函数 */
|
|
|
+ std::thread t(&ThreadPool::worker, this);
|
|
|
+ m_mapThreads.insert(std::make_pair( t.get_id(), std::move(t) ));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -210,66 +216,21 @@ void ThreadPool::createThread(int num)
|
|
|
*/
|
|
|
void ThreadPool::clearThread()
|
|
|
{
|
|
|
- std::list<OneThread>::iterator it = m_threads.begin();
|
|
|
- while(it != m_threads.end())
|
|
|
+ for(auto& it : m_exitThreadID)
|
|
|
{
|
|
|
- if (!it->isLive())
|
|
|
+ auto it1 = m_mapThreads.find(it);
|
|
|
+ if(it1 != m_mapThreads.end())
|
|
|
{
|
|
|
- break;
|
|
|
+ if(it1->second.joinable())
|
|
|
+ {
|
|
|
+ it1->second.join();
|
|
|
+ m_mapThreads.erase(it1);
|
|
|
+ }
|
|
|
}
|
|
|
- it++;
|
|
|
- }
|
|
|
- /* 递归退出条件,迭代器到最后一个了,表示线程池中已经没有失效的元素了 */
|
|
|
- if (it == m_threads.end())
|
|
|
- {
|
|
|
- return;
|
|
|
}
|
|
|
- m_threads.erase(it);
|
|
|
- clearThread(); /* 递归调用 */
|
|
|
+ m_exitThreadID.clear();
|
|
|
}
|
|
|
|
|
|
-// /* 任务函数 */
|
|
|
-// void ThreadPool::do_work()
|
|
|
-// {
|
|
|
-// m_threadExistNum++;
|
|
|
-// while (1)
|
|
|
-// {
|
|
|
-// /* 等待任务队列中有任务 */
|
|
|
-// std::unique_lock<std::mutex> lock(m_mutexTask);
|
|
|
-// m_cond_Task.wait(lock, [this]
|
|
|
-// { return !m_queue_Tasks.empty() || m_stop; });
|
|
|
-// /* 任务队列中有任务了,条件变量被唤醒了,先判断是不是需要结束线程 */
|
|
|
-// if (m_stop && m_queue_Tasks.empty())
|
|
|
-// {
|
|
|
-// break;
|
|
|
-// }
|
|
|
-// /* 判断是不是需要销毁多余的线程 */
|
|
|
-// if (m_threadExitNum > 0 )
|
|
|
-// {
|
|
|
-// m_threadExitNum--;
|
|
|
-// /* 再次判断有没有新任务,有就不退出 */
|
|
|
-// if ( m_queue_Tasks.empty())
|
|
|
-// {
|
|
|
-
|
|
|
-// break;
|
|
|
-// }
|
|
|
-// }
|
|
|
-// /* 取出任务,执行任务 */
|
|
|
-// std::function<void()> task(std::move(m_queue_Tasks.front()));
|
|
|
-// m_queue_Tasks.pop(); /* 取出的任务出队 */
|
|
|
-// lock.unlock(); /* 解锁任务队列 */
|
|
|
-
|
|
|
-// m_threadRunNum++; /* 更新线程状态数 */
|
|
|
-// /* 开始执行任务 */
|
|
|
-// task();
|
|
|
-
|
|
|
-// m_threadRunNum--; /* 更新线程状态数 */
|
|
|
-// }
|
|
|
-// /* 线程结束 */
|
|
|
-// m_threadExistNum--;
|
|
|
-
|
|
|
-// return;
|
|
|
-// }
|
|
|
|
|
|
|
|
|
/* 获取线程池最大线程的个数 */
|