ThreadPool.h 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. #ifndef THREADPOOL_H
  2. #define THREADPOOL_H
  3. #include <list>
  4. #include <thread>
  5. #include <condition_variable>
  6. #include <queue>
  7. #include <functional>
  8. #include <atomic>
  9. #include <map>
  10. #include <future>
  11. // #include "spdlog/spdlog.h"
  12. // #include "fmt/std.h"
  13. /**
  14. 说明:
  15. 1、使用map存储工作线程,线程id作为键
  16. 2、采用可变参数模板函数添加任务到任务队列中
  17. 3、线程池采用单例模式
  18. 4、有两种添加工作线程的方式,一种带有返回值,一种不带返回值
  19. 常规使用方式:
  20. 1、非成员函数
  21. void func(int a, int b) { std::cout << a + b << std::endl; }
  22. CPPTP.add_task(func, 1, 2);
  23. 2、成员函数
  24. void A::func(int a, int b) { std::cout << a + b << std::endl; }
  25. A a;
  26. CPPTP.add_task(&A::func, &a, 1, 2);
  27. Qt线程使用方式
  28. 1、Qt的事件循环需要掌控整个线程,因此需要创建一个QEventLoop然后执行exec()函数,将线程控制权交给Qt的事件循环,
  29. 这样才能正常使用信号和槽
  30. 2、线程归属权问题,QObject及子类有线程归属权问题,在哪个线程中创建的就归属于哪个线程,除非使用moveToThread()修改线程,
  31. 但是C++线程无法被move,所以推荐这个子线程使用一个壳函数作为线程功能函数,在线程函数中创建功能类,这样整个类的归属权都是子线程
  32. class A;
  33. void thread_task() { new A; }
  34. CPPTP.add_task(thread_task);
  35. 3、如果功能类是单例或者全局变量,无法在线程创建之后new出来,在线程中连接信号和槽的时候设置为 Qt::DirectConnection ,
  36. 这样才能在子线程中调用槽函数,如QTimer的timeout()信号绑定的槽函数才会在子线程中运行
  37. */
  38. #define CPPTP ThreadPool::getInstance()
  39. class ThreadPool
  40. {
  41. private:
  42. ThreadPool();
  43. ThreadPool(const ThreadPool &tp) = delete;
  44. ThreadPool &operator=(const ThreadPool &tp) = delete;
  45. public:
  46. static ThreadPool& getInstance()
  47. {
  48. static ThreadPool tp;
  49. return tp;
  50. }
  51. ~ThreadPool();
  52. /**
  53. * @brief 向任务队列添加任务函数
  54. *
  55. * @tparam F 函数(指针?)
  56. * @tparam Args 参数包
  57. * @param f 万能引用
  58. * @param args 万能引用
  59. */
  60. template <typename F, typename... Args>
  61. void add_task(F &&f, Args &&...args)
  62. {
  63. /* 将函数参数和函数绑定 */
  64. std::function<void()> task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
  65. /* 作用域是用于解锁lock */
  66. {
  67. std::unique_lock<std::mutex> lock(m_mutexTask);
  68. m_queue_Tasks.emplace(std::move(task)); /* 入队 */
  69. }
  70. /* 唤醒一个线程 */
  71. m_cond_Task.notify_one();
  72. }
  73. /* 添加带有返回值的任务函数
  74. * future内部定义了一个类型std::result_of<F(Args...)>::Type */
  75. template<typename F, typename... Args>
  76. auto add_task_with_ret(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>
  77. {
  78. /* 将类型去一个别名 */
  79. using resultType = typename std::result_of<F(Args...)>::type;
  80. /* 将传入的函数与参数绑定并包装成一个可调用的指针,使用智能指针管理 */
  81. auto task = std::make_shared<std::packaged_task<resultType()>>(
  82. /* 使用forward完美转发,防止右值变成了左值 */
  83. std::bind(std::forward<F>(f), std::forward<Args>(args)...)
  84. );
  85. /* 获取future对象,用于获取返回值 */
  86. std::future<resultType> res = task->get_future();
  87. {
  88. std::unique_lock<std::mutex> lock(m_mutexTask);
  89. /* 使用lamba表达式调用封装的函数 */
  90. m_queue_Tasks.emplace([task](){
  91. (*task)();
  92. });
  93. }
  94. /* 解锁条件变量 */
  95. m_cond_Task.notify_one();
  96. /* 返回值,函数调用完成后,可以通过这个返回值获取任务的返回值 */
  97. return res;
  98. }
  99. /****** 获取当前线程池在运行的线程个数,空闲线程的个数 ******/
  100. int getThreadMaxNum(); /* 获取线程池最大线程的个数 */
  101. void setThreadMaxNum(int num); /* 设置线程池最大线程的个数 */
  102. int getThreadMiniNum(); /* 获取线程池最小线程的个数 */
  103. void setThreadMiniNum(int num); /* 设置线程池最小线程的个数 */
  104. int getThreadIdleNum(); /* 获取线程池空闲线程的个数 */
  105. int getThreadRunNum(); /* 获取线程池正在运行的线程个数 */
  106. int getThreadLiveNum(); /* 获取线程池现存的线程个数 */
  107. int getThreadAddNum(); /* 线程池每次创建线程的个数 */
  108. void setThreadAddNum(int num); /* 设置线程池每次创建线程的个数 */
  109. int getThreadMiniIdle(); /* 线程池最小空闲线程的个数 */
  110. void setThreadMiniIdle(int num); /* 设置线程池最小空闲线程的个数 */
  111. int getThreadMaxIdle(); /* 线程池最大空闲线程的个数 */
  112. void setThreadMaxIdle(int num); /* 设置线程池最大空闲线程的个数 */
  113. private:
  114. void worker(); /* 工作线程函数 */
  115. void managerThread(); /* 维护线程池的线程,如提前创建线程,销毁线程 */
  116. void createThread(int num); /* 创建新的线程 */
  117. void clearThread(); /* 清除失效的线程实例 */
  118. private:
  119. class OneThread; /* 类声明 */
  120. std::map<std::thread::id, std::thread> m_mapThreads;/* 线程数组,维护线程池 */
  121. std::list<std::thread::id> m_exitThreadID; /* 已经退出任务函数的线程ID */
  122. std::queue<std::function<void(void)>> m_queue_Tasks;/* 任务队列,任务函数都是无返回值的函数 */
  123. std::condition_variable m_cond_Task; /* 条件变量,没有任务的时候阻塞住 */
  124. bool m_stop; /* 线程是否停止 */
  125. std::mutex m_mutexExitThreadID; /* 互斥锁,搭配环境变量使用,对已经退出的线程ID上锁 */
  126. std::mutex m_mutexTask; /* 互斥锁,搭配环境变量使用,对任务队列上锁 */
  127. std::thread m_managerThread; /* 管理线程 */
  128. int m_threadMaxNum = 256; /* 默认最大线程数 */
  129. int m_threadMiniNum = 5; /* 默认线程池中最小线程数 */
  130. std::atomic<int> m_threadAddNum; /* 每次需要添加的线程个数 */
  131. std::atomic<int> m_threadMiniIdle; /* 如果空闲线程小于这个数,就添加m_ThreadAddNum个线程 */
  132. std::atomic<int> m_threadMaxIdle; /* 最大空闲的线程个数,超过这个个数一定时间,就会销毁多余的 */
  133. std::atomic<int> m_threadRunNum; /* 正在运行的线程个数 */
  134. std::atomic<int> m_threadLiveNum; /* 现有的线程个数 */
  135. std::atomic<int> m_threadExitNum; /* 需要销毁的线程个数 */
  136. };
  137. #endif /* THREADPOOL_H */