ThreadPool.cpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. #include "ThreadPool.h"
  2. // #include "spdlog/spdlog.h"
  3. #include "StdLog/stdlog.h"
  4. /**
  5. * @brief Construct a new Thread Pool:: Thread Pool object
  6. * 构造函数,从这里创建线程,和Linux C的线程池不同,Linux C的线程池数组管理的是线程ID,
  7. * 而C++的线程数组直接存储的就是线程函数体,里面有个function的变量,指向任务队列中任务函数
  8. *
  9. * @param numThread
  10. */
  11. ThreadPool::ThreadPool() :
  12. m_stop(false)
  13. {
  14. /* 初始化变量 */
  15. // m_threadMaxNum = std::thread::hardware_concurrency(); /* 根据CPU核心数规定线程数目 */
  16. m_threadMaxNum = 256;
  17. m_threadMiniNum = 2;
  18. m_threadAddNum = 2;
  19. m_threadMiniIdle = 1;
  20. m_threadMaxIdle = 4;
  21. m_threadRunNum = 0;
  22. m_threadLiveNum = 0;
  23. m_threadExitNum = 0;
  24. /* 创建管理线程,this表示是这个类的成员函数 */
  25. m_managerThread = std::thread(&ThreadPool::managerThread, this);
  26. LOG_DEBUG("***** Hello ThreadPool *****");
  27. // /* 创建初始的numThread个线程 */
  28. // createThread(numThread);
  29. }
  30. /* 析构函数 */
  31. ThreadPool::~ThreadPool()
  32. {
  33. LOG_INFO("线程池正在退出...");
  34. /* 将stop置为true */
  35. {
  36. std::unique_lock<std::mutex> lock(m_mutexTask);
  37. m_stop = true;
  38. }
  39. LOG_INFO("通知所有工作线程退出...");
  40. /* 发送条件变量,通知所有线程 */
  41. m_cond_Task.notify_all();
  42. /* 等待所有的线程退出并回收完成 */
  43. while (!m_mapThreads.empty())
  44. {
  45. /* 管理线程自己退出了,所以要手动清空容器 */
  46. clearThread();
  47. std::this_thread::sleep_for(std::chrono::microseconds(100));
  48. }
  49. LOG_INFO("回收管理线程...");
  50. /* 先回收管理线程 */
  51. m_managerThread.join();
  52. LOG_INFO("===== 线程池退出完成 =====");
  53. }
  54. /* 工作线程函数 */
  55. void ThreadPool::worker()
  56. {
  57. m_threadLiveNum++;
  58. while (true)
  59. {
  60. /* 等待任务队列中有任务 */
  61. std::unique_lock<std::mutex> lock(m_mutexTask);
  62. /* 这里的wait第二个参数是lamba表达式,被唤醒后再次检查条件是否满足 */
  63. m_cond_Task.wait(lock, [this]
  64. { return !m_queue_Tasks.empty() || m_stop ||
  65. (m_threadExitNum > 0); });
  66. /* 任务队列中有任务了,条件变量被唤醒了,先判断是不是需要结束线程 */
  67. if (m_stop && m_queue_Tasks.empty())
  68. {
  69. break;
  70. }
  71. /* 判断是不是需要销毁多余的线程 */
  72. if (m_threadExitNum.load() > 0 )
  73. {
  74. m_threadExitNum--;
  75. /* 再次判断有没有新任务,有就不退出 */
  76. if ( m_queue_Tasks.empty())
  77. {
  78. break;
  79. }
  80. }
  81. /* 取出任务,执行任务 */
  82. std::function<void()> task(std::move(m_queue_Tasks.front()));
  83. m_queue_Tasks.pop(); /* 取出的任务出队 */
  84. lock.unlock(); /* 解锁任务队列 */
  85. m_threadRunNum++; /* 更新线程状态数 */
  86. /* 开始执行任务 */
  87. task();
  88. m_threadRunNum--; /* 更新线程状态数 */
  89. }
  90. /* 线程结束 */
  91. m_threadLiveNum--;
  92. /* 将自身ID加入到退出列表中 */
  93. {
  94. std::unique_lock<std::mutex> lock(m_mutexExitThreadID);
  95. m_exitThreadID.emplace_back(std::this_thread::get_id());
  96. }
  97. /* 使用流将线程ID转换成字符串,不然fmt无法打印
  98. * 这里通过hash转换成了size_t */
  99. // std::stringstream ss;
  100. // ss << std::this_thread::get_id();
  101. auto threadID = std::this_thread::get_id();
  102. std::hash<std::thread::id> hasher;
  103. size_t id = hasher(threadID);
  104. LOG_DEBUG("线程ID: " << id << " 退出任务循环");
  105. return;
  106. }
  107. /**
  108. * @brief 管理者线程,维护线程创建或死亡
  109. *
  110. */
  111. void ThreadPool::managerThread()
  112. {
  113. while (!m_stop)
  114. {
  115. /* 获取空闲线程的个数 */
  116. int num_idle = m_threadLiveNum.load() - m_threadRunNum.load();
  117. /* 判断线程是否够用,是否需要创建新的线程 */
  118. // LOG_DEBUG("***** 判断是否需要添加线程 *****");
  119. if ((num_idle < m_threadMiniIdle.load()) && (m_threadLiveNum.load() < m_threadMaxNum) )
  120. {
  121. std::unique_lock<std::mutex> lock(m_mutexTask);
  122. int numTask = (int)m_queue_Tasks.size(); /* 获取任务队列中的任务个数 */
  123. lock.unlock();
  124. int numAdd = 0;
  125. /* 重新修改增加数目,正常情况下增加 numTask + m_threadMiniIdle - num_idle 数目,达到最低空闲线程个数,
  126. 如果二者之和超过了最大线程数,就添加可以添加的最多数目 */
  127. // if(numTask > 0)
  128. // {
  129. // /* 任务数 + 存在的线程个数是否大于最大线程数 */
  130. // if( numTask + m_threadLiveNum.load() <= m_threadMaxNum )
  131. // {
  132. // /* 创建numTask个线程 */
  133. // numAdd = numTask;
  134. // }
  135. // /* 默认添加的个数 + 存在的线程数是否大于最大线程数 */
  136. // else if ( (m_threadAddNum.load() + m_threadLiveNum.load()) <= m_threadMaxNum)
  137. // {
  138. // /* 创建m_threadAddNum个线程 */
  139. // numAdd = m_threadAddNum.load();
  140. // }
  141. // /* 能添加几个线程就添加几个线程 */
  142. // else
  143. // {
  144. // numAdd = m_threadMaxNum - m_threadLiveNum.load();
  145. // }
  146. // } else
  147. // {
  148. // /* 没有新任务,但是空闲线程数低于设置的最小空闲线程数 */
  149. // numAdd = m_threadMiniIdle.load() - num_idle;
  150. // }
  151. numAdd = numTask + m_threadMiniIdle.load() - num_idle;
  152. if(numAdd < 0)
  153. {
  154. numAdd = 0; /* 如果计算出来的添加数目小于0,就不添加 */
  155. }
  156. /* 如果添加的线程数目超过了最大线程数,就添加可以添加的最多数目 */
  157. if (numAdd + m_threadLiveNum.load() > m_threadMaxNum)
  158. {
  159. numAdd = m_threadMaxNum - m_threadLiveNum.load();
  160. }
  161. if(numAdd > 0)
  162. {
  163. // LOG_INFO("需要添加{}个线程", numAdd);
  164. createThread(numAdd);
  165. continue; /* 直接下一个循环,无需检查需不需要销毁线程 */
  166. }
  167. }
  168. if(m_threadLiveNum .load() >= m_threadMaxNum)
  169. {
  170. LOG_DEBUG("线程池中现存的线程个数: " << m_threadLiveNum.load() << ",超过最大线程数: " << m_threadMaxNum);
  171. }
  172. /* 判断空闲线程是否过多,是否需要销毁几个线程 */
  173. // LOG_DEBUG("***** 判断是否需要销毁线程 *****");
  174. /* 由于没规定每次销毁的线程个数,所以这里使用m_threadAddNum作为每次销毁的标准个数 */
  175. if (num_idle > m_threadMaxIdle.load())
  176. {
  177. int num_Exit = num_idle = m_threadMaxIdle.load();
  178. if (num_Exit > m_threadAddNum.load())
  179. {
  180. num_Exit = m_threadAddNum.load();
  181. }
  182. m_threadExitNum.exchange(num_Exit);
  183. LOG_DEBUG("有" << m_threadExitNum.load() << "个线程需要退出");
  184. /* 唤醒需要退出的num_idle个线程 */
  185. for (int i = 0; i < num_Exit; i++)
  186. {
  187. m_cond_Task.notify_one();
  188. }
  189. }
  190. /* 回收退出的线程 */
  191. clearThread();
  192. // LOG_INFO("线程池中的线程实例个数:{}", m_threads.size());
  193. std::this_thread::sleep_for(std::chrono::seconds(1));
  194. }
  195. LOG_INFO("管理线程退出...");
  196. }
  197. /**
  198. * @brief 创建新的线程
  199. * 注意:这里只能使用lambda表达式,或者将do_work变成全局函数,emplace_back会调用thread构造函数将lambda表达式构造成一个std::thread实例
  200. * lambda表达式里是子线程,外面是主线程
  201. *
  202. */
  203. void ThreadPool::createThread(int num)
  204. {
  205. if(num <=0 )
  206. {
  207. LOG_WARN("创建线程的个数不能小于等于0");
  208. return;
  209. }
  210. for (int i = 0; i < num; i++)
  211. {
  212. /* 创建线程,传入工作函数 */
  213. std::thread t(&ThreadPool::worker, this);
  214. m_mapThreads.insert(std::make_pair( t.get_id(), std::move(t) ));
  215. }
  216. }
  217. /**
  218. * @brief 删除线程池中失效的线程实例,使用递归的方法遍历全部
  219. *
  220. */
  221. void ThreadPool::clearThread()
  222. {
  223. for(auto& it : m_exitThreadID)
  224. {
  225. auto it1 = m_mapThreads.find(it);
  226. if(it1 != m_mapThreads.end())
  227. {
  228. if(it1->second.joinable())
  229. {
  230. it1->second.join();
  231. m_mapThreads.erase(it1);
  232. }
  233. }
  234. }
  235. m_exitThreadID.clear();
  236. }
  237. /* 获取线程池最大线程的个数 */
  238. int ThreadPool::getThreadMaxNum()
  239. {
  240. return m_threadMaxNum;
  241. }
  242. /* 设置线程池最大线程的个数 */
  243. void ThreadPool::setThreadMaxNum(int num)
  244. {
  245. m_threadMaxNum = num;
  246. }
  247. /* 获取线程池最大线程的个数 */
  248. int ThreadPool::getThreadMiniNum()
  249. {
  250. return m_threadMiniNum;
  251. }
  252. /* 设置线程池最大线程的个数 */
  253. void ThreadPool::setThreadMiniNum(int num)
  254. {
  255. m_threadMiniNum = num;
  256. }
  257. /* 获取线程池空闲线程的个数 */
  258. int ThreadPool::getThreadIdleNum()
  259. {
  260. return m_threadLiveNum.load() - m_threadRunNum.load();
  261. }
  262. /* 获取线程池正在运行的线程个数 */
  263. int ThreadPool::getThreadRunNum()
  264. {
  265. return m_threadRunNum.load();
  266. }
  267. /* 获取线程池现存的线程个数 */
  268. int ThreadPool::getThreadLiveNum()
  269. {
  270. return m_threadLiveNum.load();
  271. }
  272. /* 线程池每次创建线程的个数 */
  273. int ThreadPool::getThreadAddNum()
  274. {
  275. return m_threadAddNum.load();
  276. }
  277. /* 设置线程池每次创建线程的个数 */
  278. void ThreadPool::setThreadAddNum(int num)
  279. {
  280. m_threadAddNum.exchange(num);
  281. }
  282. /* 线程池最小空闲线程的个数 */
  283. int ThreadPool::getThreadMiniIdle()
  284. {
  285. return m_threadMiniIdle.load();
  286. }
  287. /* 设置线程池最小空闲线程的个数 */
  288. void ThreadPool::setThreadMiniIdle(int num)
  289. {
  290. m_threadMiniIdle.exchange(num);
  291. }
  292. /* 线程池最大空闲线程的个数 */
  293. int ThreadPool::getThreadMaxIdle()
  294. {
  295. return m_threadMaxIdle.load();
  296. }
  297. /* 设置线程池最大空闲线程的个数 */
  298. void ThreadPool::setThreadMaxIdle(int num)
  299. {
  300. m_threadMaxIdle.exchange(num);
  301. }