ThreadPool_spdlog.cpp_ 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. #include "ThreadPool.h"
  2. #include "spdlog/spdlog.h"
  3. /**
  4. * @brief Construct a new Thread Pool:: Thread Pool object
  5. * 构造函数,从这里创建线程,和Linux C的线程池不同,Linux C的线程池数组管理的是线程ID,
  6. * 而C++的线程数组直接存储的就是线程函数体,里面有个function的变量,指向任务队列中任务函数
  7. *
  8. * @param numThread
  9. */
  10. ThreadPool::ThreadPool() :
  11. m_stop(false)
  12. {
  13. /* 初始化变量 */
  14. // m_threadMaxNum = std::thread::hardware_concurrency(); /* 根据CPU核心数规定线程数目 */
  15. m_threadMaxNum = 256;
  16. m_threadMiniNum = 2;
  17. m_threadAddNum = 2;
  18. m_threadMiniIdle = 1;
  19. m_threadMaxIdle = 4;
  20. m_threadRunNum = 0;
  21. m_threadLiveNum = 0;
  22. m_threadExitNum = 0;
  23. /* 创建管理线程,this表示是这个类的成员函数 */
  24. m_managerThread = std::thread(&ThreadPool::managerThread, this);
  25. SPDLOG_DEBUG("***** Hello ThreadPool *****");
  26. // /* 创建初始的numThread个线程 */
  27. // createThread(numThread);
  28. }
  29. /* 析构函数 */
  30. ThreadPool::~ThreadPool()
  31. {
  32. SPDLOG_INFO("线程池正在退出...");
  33. /* 将stop置为true */
  34. {
  35. std::unique_lock<std::mutex> lock(m_mutexTask);
  36. m_stop = true;
  37. }
  38. SPDLOG_INFO("通知所有工作线程退出...");
  39. /* 发送条件变量,通知所有线程 */
  40. m_cond_Task.notify_all();
  41. /* 等待所有的线程退出并回收完成 */
  42. while (!m_mapThreads.empty())
  43. {
  44. /* 管理线程自己退出了,所以要手动清空容器 */
  45. clearThread();
  46. std::this_thread::sleep_for(std::chrono::microseconds(100));
  47. }
  48. SPDLOG_INFO("回收管理线程...");
  49. /* 先回收管理线程 */
  50. m_managerThread.join();
  51. SPDLOG_INFO("===== 线程池退出完成 =====");
  52. }
  53. /* 工作线程函数 */
  54. void ThreadPool::worker()
  55. {
  56. m_threadLiveNum++;
  57. while (true)
  58. {
  59. /* 等待任务队列中有任务 */
  60. std::unique_lock<std::mutex> lock(m_mutexTask);
  61. /* 这里的wait第二个参数是lamba表达式,被唤醒后再次检查条件是否满足 */
  62. m_cond_Task.wait(lock, [this]
  63. { return !m_queue_Tasks.empty() || m_stop ||
  64. (m_threadExitNum > 0); });
  65. /* 任务队列中有任务了,条件变量被唤醒了,先判断是不是需要结束线程 */
  66. if (m_stop && m_queue_Tasks.empty())
  67. {
  68. break;
  69. }
  70. /* 判断是不是需要销毁多余的线程 */
  71. if (m_threadExitNum.load() > 0 )
  72. {
  73. m_threadExitNum--;
  74. /* 再次判断有没有新任务,有就不退出 */
  75. if ( m_queue_Tasks.empty())
  76. {
  77. break;
  78. }
  79. }
  80. /* 取出任务,执行任务 */
  81. std::function<void()> task(std::move(m_queue_Tasks.front()));
  82. m_queue_Tasks.pop(); /* 取出的任务出队 */
  83. lock.unlock(); /* 解锁任务队列 */
  84. m_threadRunNum++; /* 更新线程状态数 */
  85. /* 开始执行任务 */
  86. task();
  87. m_threadRunNum--; /* 更新线程状态数 */
  88. }
  89. /* 线程结束 */
  90. m_threadLiveNum--;
  91. /* 将自身ID加入到退出列表中 */
  92. {
  93. std::unique_lock<std::mutex> lock(m_mutexExitThreadID);
  94. m_exitThreadID.emplace_back(std::this_thread::get_id());
  95. }
  96. /* 使用流将线程ID转换成字符串,不然fmt无法打印
  97. * 这里通过hash转换成了size_t */
  98. // std::stringstream ss;
  99. // ss << std::this_thread::get_id();
  100. auto threadID = std::this_thread::get_id();
  101. std::hash<std::thread::id> hasher;
  102. size_t id = hasher(threadID);
  103. SPDLOG_DEBUG("线程ID:{}退出任务循环", id);
  104. return;
  105. }
  106. /**
  107. * @brief 管理者线程,维护线程创建或死亡
  108. *
  109. */
  110. void ThreadPool::managerThread()
  111. {
  112. while (!m_stop)
  113. {
  114. /* 获取空闲线程的个数 */
  115. int num_idle = m_threadLiveNum.load() - m_threadRunNum.load();
  116. /* 判断线程是否够用,是否需要创建新的线程 */
  117. // SPDLOG_DEBUG("***** 判断是否需要添加线程 *****");
  118. if ((num_idle < m_threadMiniIdle.load()) && (m_threadLiveNum.load() < m_threadMaxNum) )
  119. {
  120. std::unique_lock<std::mutex> lock(m_mutexTask);
  121. int numTask = (int)m_queue_Tasks.size(); /* 获取任务队列中的任务个数 */
  122. lock.unlock();
  123. int numAdd = 0;
  124. if(numTask > 0)
  125. {
  126. /* 任务数 + 存在的线程个数是否大于最大线程数 */
  127. if( numTask + m_threadLiveNum.load() <= m_threadMaxNum )
  128. {
  129. /* 创建numTask个线程 */
  130. numAdd = numTask;
  131. }
  132. /* 默认添加的个数 + 存在的线程数是否大于最大线程数 */
  133. else if ( (m_threadAddNum.load() + m_threadLiveNum.load()) <= m_threadMaxNum)
  134. {
  135. /* 创建m_threadAddNum个线程 */
  136. numAdd = m_threadAddNum.load();
  137. }
  138. /* 能添加几个线程就添加几个线程 */
  139. else
  140. {
  141. numAdd = m_threadMaxNum - m_threadLiveNum.load();
  142. }
  143. }
  144. /* 空闲线程数低于设置的最小空闲线程数 */
  145. else
  146. {
  147. numAdd = m_threadMiniIdle.load() - num_idle;
  148. }
  149. if(numAdd > 0)
  150. {
  151. // SPDLOG_INFO("需要添加{}个线程", numAdd);
  152. createThread(numAdd);
  153. continue; /* 直接下一个循环,无需检查需不需要销毁线程 */
  154. }
  155. }
  156. /* 判断空闲线程是否过多,是否需要销毁几个线程 */
  157. // SPDLOG_DEBUG("***** 判断是否需要销毁线程 *****");
  158. /* 由于没规定每次销毁的线程个数,所以这里使用m_threadAddNum作为每次销毁的标准个数 */
  159. if (num_idle > m_threadMaxIdle.load())
  160. {
  161. int num_Exit = num_idle = m_threadMaxIdle.load();
  162. if (num_Exit > m_threadAddNum.load())
  163. {
  164. num_Exit = m_threadAddNum.load();
  165. }
  166. m_threadExitNum.exchange(num_Exit);
  167. SPDLOG_INFO("有{}个线程需要退出", m_threadExitNum.load());
  168. /* 唤醒需要退出的num_idle个线程 */
  169. for (int i = 0; i < num_Exit; i++)
  170. {
  171. m_cond_Task.notify_one();
  172. }
  173. }
  174. /* 回收退出的线程 */
  175. clearThread();
  176. // SPDLOG_INFO("线程池中的线程实例个数:{}", m_threads.size());
  177. std::this_thread::sleep_for(std::chrono::seconds(1));
  178. }
  179. SPDLOG_INFO("管理线程退出...");
  180. }
  181. /**
  182. * @brief 创建新的线程
  183. * 注意:这里只能使用lambda表达式,或者将do_work变成全局函数,emplace_back会调用thread构造函数将lambda表达式构造成一个std::thread实例
  184. * lambda表达式里是子线程,外面是主线程
  185. *
  186. */
  187. void ThreadPool::createThread(int num)
  188. {
  189. for (int i = 0; i < num; i++)
  190. {
  191. /* 创建线程,传入工作函数 */
  192. std::thread t(&ThreadPool::worker, this);
  193. m_mapThreads.insert(std::make_pair( t.get_id(), std::move(t) ));
  194. }
  195. }
  196. /**
  197. * @brief 删除线程池中失效的线程实例,使用递归的方法遍历全部
  198. *
  199. */
  200. void ThreadPool::clearThread()
  201. {
  202. for(auto& it : m_exitThreadID)
  203. {
  204. auto it1 = m_mapThreads.find(it);
  205. if(it1 != m_mapThreads.end())
  206. {
  207. if(it1->second.joinable())
  208. {
  209. it1->second.join();
  210. m_mapThreads.erase(it1);
  211. }
  212. }
  213. }
  214. m_exitThreadID.clear();
  215. }
  216. /* 获取线程池最大线程的个数 */
  217. int ThreadPool::getThreadMaxNum()
  218. {
  219. return m_threadMaxNum;
  220. }
  221. /* 设置线程池最大线程的个数 */
  222. void ThreadPool::setThreadMaxNum(int num)
  223. {
  224. m_threadMaxNum = num;
  225. }
  226. /* 获取线程池最大线程的个数 */
  227. int ThreadPool::getThreadMiniNum()
  228. {
  229. return m_threadMiniNum;
  230. }
  231. /* 设置线程池最大线程的个数 */
  232. void ThreadPool::setThreadMiniNum(int num)
  233. {
  234. m_threadMiniNum = num;
  235. }
  236. /* 获取线程池空闲线程的个数 */
  237. int ThreadPool::getThreadIdleNum()
  238. {
  239. return m_threadLiveNum.load() - m_threadRunNum.load();
  240. }
  241. /* 获取线程池正在运行的线程个数 */
  242. int ThreadPool::getThreadRunNum()
  243. {
  244. return m_threadRunNum.load();
  245. }
  246. /* 获取线程池现存的线程个数 */
  247. int ThreadPool::getThreadLiveNum()
  248. {
  249. return m_threadLiveNum.load();
  250. }
  251. /* 线程池每次创建线程的个数 */
  252. int ThreadPool::getThreadAddNum()
  253. {
  254. return m_threadAddNum.load();
  255. }
  256. /* 设置线程池每次创建线程的个数 */
  257. void ThreadPool::setThreadAddNum(int num)
  258. {
  259. m_threadAddNum.exchange(num);
  260. }
  261. /* 线程池最小空闲线程的个数 */
  262. int ThreadPool::getThreadMiniIdle()
  263. {
  264. return m_threadMiniIdle.load();
  265. }
  266. /* 设置线程池最小空闲线程的个数 */
  267. void ThreadPool::setThreadMiniIdle(int num)
  268. {
  269. m_threadMiniIdle.exchange(num);
  270. }
  271. /* 线程池最大空闲线程的个数 */
  272. int ThreadPool::getThreadMaxIdle()
  273. {
  274. return m_threadMaxIdle.load();
  275. }
  276. /* 设置线程池最大空闲线程的个数 */
  277. void ThreadPool::setThreadMaxIdle(int num)
  278. {
  279. m_threadMaxIdle.exchange(num);
  280. }