TWTimer.hpp 11 KB


  1. #include <chrono>
  2. #include <functional>
  3. #include <list>
  4. #include <mutex>
  5. #include <thread>
  6. #include <vector>
  7. #include "ThreadPool/ThreadPool.h"
  8. #include "FmtLog/fmtlog.h"
  9. /**
  10. * @brief 这是一个时间轮定时器的实现,将任务添加到时间轮中,时间轮会在指定的时间点执行任务
  11. * 这个定时器的最大定时时间是1小时,最小时间间隔是1ms
  12. *
  13. *
  14. * 设计说明:
  15. * 1、时间轮的时间点是固定的,每个时间点的时间间隔是1ms,最大时间点是1000 * 60 * 60个,也就是1小时
  16. * 2、任务是挂在一个个时间点上的,同一个时间点可以有多个任务
  17. * 3、时间轮的执行是在一个新的线程中执行的,每隔1ms检测一次是否有任务需要执行
  18. * 4、当前时间点是下一个将要执行的时间点,新加入的立即执行的任务就会加入到当前时间点上
  19. * 5、时间轮的时间点是循环的,当到达最大时间点时,会重新从0开始
  20. * 6、管理线程是100us运行一次,在运行时会进行时间累积,相差1ms就会执行一次任务
  21. *
  22. * 缺点:
  23. * 1、时间轮的时间点是固定的,如果任务的时间点不在时间轮的时间点上,就会被延迟执行
  24. */
  25. /** ==================================================================
  26. * ************************** 任务结构体 ******************************
  27. * ================================================================== */
  28. /**
  29. * 任务时间点结构体,用来标识任务在时间轮中的位置
  30. */
  31. struct TaskTimerPoint
  32. {
  33. long firstLevelIndex = 0;
  34. long secondLevelIndex = 0;
  35. long thirdLevelIndex = 0;
  36. TaskTimerPoint() = default;
  37. TaskTimerPoint& operator=(const TaskTimerPoint& other) {
  38. firstLevelIndex = other.firstLevelIndex;
  39. secondLevelIndex = other.secondLevelIndex;
  40. thirdLevelIndex = other.thirdLevelIndex;
  41. return *this;
  42. }
  43. };
  44. /* 定义函数指针 */
  45. using TaskFunc = std::function<void()>;
  46. /* 定义任务结构体 */
  47. struct Task {
  48. bool is_loop; /* 是否循环定时 */
  49. long interval; /* 定时间隔 */
  50. TaskFunc func; /* 任务函数 */
  51. TaskTimerPoint timerPoint; /* 任务时间点,在时间轮中的位置 */
  52. };
  53. /** ==================================================================
  54. * ***************************** 任务类 ******************************
  55. * ================================================================== */
  56. class TimerWheel {
  57. public:
  58. /* 构造函数,第一个参数是任务时间点容器最大数量,第二个参数是最低检测时间单位 */
  59. explicit TimerWheel()
  60. : m_currentFirstIndex(0),
  61. m_currentSecondIndex(0),
  62. m_currentThirdIndex(0),
  63. m_currentIndex(0)
  64. {
  65. m_wheel_size = 1000 * 60 * 60; /* 1小时定时间隔 */
  66. m_interval_ms = 1; /* 1ms */
  67. m_firstLevelWheel.resize(1000);
  68. m_secondLevelWheel.resize(60);
  69. m_thirdLevelWheel.resize(60);
  70. }
  71. ~TimerWheel() {
  72. Stop();
  73. }
  74. /* 开启时间轮 */
  75. void Start()
  76. {
  77. if (m_running) {
  78. return;
  79. }
  80. m_running = true;
  81. /* 开启新线程,定时检测队列 */
  82. m_thread = std::thread([this]() {
  83. std::chrono::steady_clock::time_point lastTime = std::chrono::steady_clock::now();
  84. while (m_running) {
  85. /* 100us计算一次距离上次执行的间隔时间,超过1ms就执行一次 */
  86. std::this_thread::sleep_for(std::chrono::microseconds(m_tick_us));
  87. std::chrono::steady_clock::time_point nowTime = std::chrono::steady_clock::now();
  88. auto duration = std::chrono::duration_cast<std::chrono::microseconds>(nowTime - lastTime);
  89. if(duration.count() >= 900) {
  90. Tick();
  91. lastTime = nowTime;
  92. }
  93. }
  94. FMTLOG_INFO("TimerWheel thread exit");
  95. });
  96. }
  97. void Stop()
  98. {
  99. if (!m_running) {
  100. return;
  101. }
  102. m_running = false;
  103. if (m_thread.joinable()) {
  104. m_thread.join();
  105. }
  106. }
  107. /* 绑定任务函数和参数 */
  108. template<typename F, typename... Args>
  109. std::function<void()> bindTask(int timeout_ms, F&& f, Args&&... args)
  110. {
  111. return std::bind(std::forward<F>(f), std::forward<Args>(args)...);
  112. }
  113. /* 添加任务函数 */
  114. void AddTask(long timeout_ms, std::function<void()> func, bool is_loop = false)
  115. {
  116. /* 这里需要将当前时间点一起上锁,防止立即执行的任务会错过 */
  117. std::lock_guard<std::mutex> lock(m_mutex);
  118. /* 计算出定时的时间点数目 */
  119. long ticks = timeout_ms / m_interval_ms;
  120. if(ticks > m_wheel_size) {
  121. FMTLOG_WARN("timeout is too long");
  122. return;
  123. }
  124. Task task;
  125. task.is_loop = is_loop;
  126. task.interval = timeout_ms;
  127. task.func = func;
  128. task.timerPoint.firstLevelIndex = 0;
  129. task.timerPoint.secondLevelIndex = 0;
  130. task.timerPoint.thirdLevelIndex = 0;
  131. /* 加上当前时间点的位置偏移,计算出需要插入的时间点 */
  132. // long index = m_currentFirstIndex + m_currentSecondIndex * m_firstLevelCount + m_currentThirdIndex * m_secondLevelCount * m_firstLevelCount + ticks;
  133. long index = m_currentIndex + ticks;
  134. /* 计算出需要的位置,如果超出了最大范围就放到过去的时间点中 */
  135. index = index % m_wheel_size;
  136. /* 计算出时间点的位置 */
  137. task.timerPoint.firstLevelIndex = index % m_firstLevelCount;
  138. task.timerPoint.secondLevelIndex = (index / m_firstLevelCount) % m_secondLevelCount;
  139. task.timerPoint.thirdLevelIndex = (index / m_firstLevelCount / m_secondLevelCount) % m_thirdLevelCount;
  140. /* 加入到时间轮的任务队列中 */
  141. if(task.timerPoint.thirdLevelIndex != m_currentThirdIndex)
  142. {
  143. /* 不在当前正在轮训的第三层时间点内,直接加入到对应的时间点中即可 */
  144. m_thirdLevelWheel[task.timerPoint.thirdLevelIndex].push_back(task);
  145. }
  146. /* 在当前正在轮训的第三层时间点内,继续判断是否已经是过去的时间点 */
  147. else
  148. {
  149. if(task.timerPoint.secondLevelIndex < m_currentSecondIndex ||
  150. (task.timerPoint.secondLevelIndex == m_currentSecondIndex && task.timerPoint.firstLevelIndex < m_currentFirstIndex))
  151. {
  152. /* 在当前正在轮训的第三层时间点内,但是已经是过去的时间点,加入到当前第三层的队列中 */
  153. m_thirdLevelWheel[task.timerPoint.thirdLevelIndex].push_back(task);
  154. }
  155. /* 在当前第三层还未执行的时间点内 */
  156. else if(task.timerPoint.secondLevelIndex > m_currentFirstIndex)
  157. {
  158. /* 在第二轮未执行的点中,加入到第二轮队列中 */
  159. m_secondLevelWheel[task.timerPoint.secondLevelIndex].push_back(task);
  160. }
  161. else
  162. {
  163. /* 在当前正在轮训的第二层时间点内,且是未来的时间点,加入到对应的第一层时间点中 */
  164. m_firstLevelWheel[task.timerPoint.firstLevelIndex].push_back(task);
  165. }
  166. }
  167. }
  168. private:
  169. /* 时间片 */
  170. void Tick()
  171. {
  172. m_mutex.lock();
  173. /* 取出这个时间点的函数,循环执行 */
  174. auto listTask = std::move(m_firstLevelWheel[m_currentFirstIndex]);
  175. m_firstLevelWheel[m_currentFirstIndex].clear();
  176. m_mutex.unlock();
  177. /* 将任务加入到线程池中执行 */
  178. for (const auto& task : listTask) {
  179. CPPTP.add_task(task.func);
  180. }
  181. /* 将时间点向前 + 1 */
  182. m_mutex.lock();
  183. m_currentFirstIndex = (m_currentFirstIndex + 1) % m_firstLevelCount;
  184. /* 判断第一层时间轮是否走完了 */
  185. if(m_currentFirstIndex == 0)
  186. {
  187. /* 第一个轮走完,取出第二层的下一个时间点 */
  188. m_currentSecondIndex = (m_currentSecondIndex + 1) % m_secondLevelCount;
  189. if(m_currentSecondIndex == 0)
  190. {
  191. /* 第二层时间轮也走完了,就取出第三个时间轮的下一个时间点 */
  192. m_currentThirdIndex = (m_currentThirdIndex + 1) % m_thirdLevelCount;
  193. /* 清空第二层时间轮 */
  194. m_secondLevelWheel.clear();
  195. for (auto& task : m_thirdLevelWheel[m_currentThirdIndex])
  196. {
  197. m_secondLevelWheel[task.timerPoint.secondLevelIndex].push_back(task);
  198. }
  199. m_thirdLevelWheel[m_currentThirdIndex].clear();
  200. }
  201. /* 清空第一层时间轮 */
  202. m_firstLevelWheel.clear();
  203. for (auto& task : m_secondLevelWheel[m_currentSecondIndex])
  204. {
  205. m_firstLevelWheel[task.timerPoint.firstLevelIndex].push_back(task);
  206. }
  207. m_secondLevelWheel[m_currentSecondIndex].clear();
  208. }
  209. m_mutex.unlock();
  210. /* 将循环定时的任务重新加入到时间轮中 */
  211. for(auto& task : listTask)
  212. {
  213. if(task.is_loop)
  214. {
  215. AddTask(task.interval, task.func, true);
  216. }
  217. }
  218. m_currentIndex = (m_currentIndex + 1) % m_wheel_size;
  219. }
  220. private:
  221. long m_max_interval = 0; /* 最大的定时时间长度,等于下面两个之和,超过就报错 */
  222. long m_wheel_size; /* 时间轮最大的轮训次数 */
  223. long m_interval_ms; /* 每个时间点的间隔秒数,这里是1ms */
  224. const long m_tick_us = 100; /* 管理线程跳动时间是100us */
  225. const long m_firstLevelCount = 1000; /* 第一层级的时间轮的个数 */
  226. const long m_secondLevelCount = 60; /* 第二层级的时间轮的个数 */
  227. const long m_thirdLevelCount = 60; /* 第三层级的时间轮的个数 */
  228. std::vector<std::list<Task>> m_firstLevelWheel; /* 第一层级的时间轮,这里是ms等级,总共1000个 */
  229. std::vector<std::list<Task>> m_secondLevelWheel; /* 第二层级的时间轮,这里是秒的等级,总共60个 */
  230. std::vector<std::list<Task>> m_thirdLevelWheel; /* 第三层级的时间轮,这里是分钟的等级,总共60个 */
  231. long m_currentFirstIndex; /* 现在的第一层时间点 */
  232. long m_currentSecondIndex; /* 现在的第二层位置 */
  233. long m_currentThirdIndex; /* 现在的第三层位置 */
  234. long m_currentIndex; /* 现在的时间点 */
  235. bool m_running = false; /* 运行标志位 */
  236. std::thread m_thread;
  237. std::mutex m_mutex;
  238. };