|
@@ -0,0 +1,267 @@
|
|
|
|
+
|
|
|
|
+#include <chrono>
|
|
|
|
+#include <functional>
|
|
|
|
+#include <list>
|
|
|
|
+#include <mutex>
|
|
|
|
+#include <thread>
|
|
|
|
+#include <vector>
|
|
|
|
+
|
|
|
|
+#include "ThreadPool/ThreadPool.h"
|
|
|
|
+#include "FmtLog/fmtlog.h"
|
|
|
|
+
|
|
|
|
+/**
|
|
|
|
+ * @brief 这是一个时间轮定时器的实现,将任务添加到时间轮中,时间轮会在指定的时间点执行任务
|
|
|
|
+ * 这个定时器的最大定时时间是1小时,最小时间间隔是1ms
|
|
|
|
+ *
|
|
|
|
+ *
|
|
|
|
+ * 设计说明:
|
|
|
|
+ * 1、时间轮的时间点是固定的,每个时间点的时间间隔是1ms,最大时间点是1000 * 60 * 60个,也就是1小时
|
|
|
|
+ * 2、任务是挂在一个个时间点上的,同一个时间点可以有多个任务
|
|
|
|
+ * 3、时间轮的执行是在一个新的线程中执行的,每隔1ms检测一次是否有任务需要执行
|
|
|
|
+ * 4、当前时间点是下一个将要执行的时间点,新加入的立即执行的任务就会加入到当前时间点上
|
|
|
|
+ * 5、时间轮的时间点是循环的,当到达最大时间点时,会重新从0开始
|
|
|
|
+ * 6、管理线程是100us运行一次,在运行时会进行时间累积,相差1ms就会执行一次任务
|
|
|
|
+ *
|
|
|
|
+ * 缺点:
|
|
|
|
+ * 1、时间轮的时间点是固定的,如果任务的时间点不在时间轮的时间点上,就会被延迟执行
|
|
|
|
+ */
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+/** ==================================================================
|
|
|
|
+ * ************************** 任务结构体 ******************************
|
|
|
|
+ * ================================================================== */
|
|
|
|
+
|
|
|
|
+/**
|
|
|
|
+ * 任务时间点结构体,用来标识任务在时间轮中的位置
|
|
|
|
+ */
|
|
|
|
+struct TaskTimerPoint
|
|
|
|
+{
|
|
|
|
+ long firstLevelIndex = 0;
|
|
|
|
+ long secondLevelIndex = 0;
|
|
|
|
+ long thirdLevelIndex = 0;
|
|
|
|
+
|
|
|
|
+ TaskTimerPoint() = default;
|
|
|
|
+ TaskTimerPoint& operator=(const TaskTimerPoint& other) {
|
|
|
|
+ firstLevelIndex = other.firstLevelIndex;
|
|
|
|
+ secondLevelIndex = other.secondLevelIndex;
|
|
|
|
+ thirdLevelIndex = other.thirdLevelIndex;
|
|
|
|
+ return *this;
|
|
|
|
+ }
|
|
|
|
+};
|
|
|
|
+
|
|
|
|
+/* 定义函数指针 */
|
|
|
|
+using TaskFunc = std::function<void()>;
|
|
|
|
+/* 定义任务结构体 */
|
|
|
|
+struct Task {
|
|
|
|
+ bool is_loop; /* 是否循环定时 */
|
|
|
|
+ long interval; /* 定时间隔 */
|
|
|
|
+ TaskFunc func; /* 任务函数 */
|
|
|
|
+ TaskTimerPoint timerPoint; /* 任务时间点,在时间轮中的位置 */
|
|
|
|
+};
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+/** ==================================================================
|
|
|
|
+ * ***************************** 任务类 ******************************
|
|
|
|
+ * ================================================================== */
|
|
|
|
+
|
|
|
|
+class TimerWheel {
|
|
|
|
+public:
|
|
|
|
+
|
|
|
|
+ /* 构造函数,第一个参数是任务时间点容器最大数量,第二个参数是最低检测时间单位 */
|
|
|
|
+ explicit TimerWheel()
|
|
|
|
+ : m_currentFirstIndex(0),
|
|
|
|
+ m_currentSecondIndex(0),
|
|
|
|
+ m_currentThirdIndex(0),
|
|
|
|
+ m_currentIndex(0)
|
|
|
|
+ {
|
|
|
|
+ m_wheel_size = 1000 * 60 * 60; /* 1小时定时间隔 */
|
|
|
|
+ m_interval_ms = 1; /* 1ms */
|
|
|
|
+ m_firstLevelWheel.resize(1000);
|
|
|
|
+ m_secondLevelWheel.resize(60);
|
|
|
|
+ m_thirdLevelWheel.resize(60);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ ~TimerWheel() {
|
|
|
|
+ Stop();
|
|
|
|
+ }
|
|
|
|
+ /* 开启时间轮 */
|
|
|
|
+ void Start()
|
|
|
|
+ {
|
|
|
|
+ if (m_running) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ m_running = true;
|
|
|
|
+ /* 开启新线程,定时检测队列 */
|
|
|
|
+ m_thread = std::thread([this]() {
|
|
|
|
+ std::chrono::steady_clock::time_point lastTime = std::chrono::steady_clock::now();
|
|
|
|
+ while (m_running) {
|
|
|
|
+ /* 100us计算一次距离上次执行的间隔时间,超过1ms就执行一次 */
|
|
|
|
+ std::this_thread::sleep_for(std::chrono::microseconds(m_tick_us));
|
|
|
|
+ std::chrono::steady_clock::time_point nowTime = std::chrono::steady_clock::now();
|
|
|
|
+ auto duration = std::chrono::duration_cast<std::chrono::microseconds>(nowTime - lastTime);
|
|
|
|
+ if(duration.count() >= 900) {
|
|
|
|
+ Tick();
|
|
|
|
+ lastTime = nowTime;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+ FMTLOG_INFO("TimerWheel thread exit");
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ void Stop()
|
|
|
|
+ {
|
|
|
|
+ if (!m_running) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ m_running = false;
|
|
|
|
+ if (m_thread.joinable()) {
|
|
|
|
+ m_thread.join();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ /* 绑定任务函数和参数 */
|
|
|
|
+ template<typename F, typename... Args>
|
|
|
|
+ std::function<void()> bindTask(int timeout_ms, F&& f, Args&&... args)
|
|
|
|
+ {
|
|
|
|
+ return std::bind(std::forward<F>(f), std::forward<Args>(args)...);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /* 添加任务函数 */
|
|
|
|
+ void AddTask(long timeout_ms, std::function<void()> func, bool is_loop = false)
|
|
|
|
+ {
|
|
|
|
+ /* 这里需要将当前时间点一起上锁,防止立即执行的任务会错过 */
|
|
|
|
+ std::lock_guard<std::mutex> lock(m_mutex);
|
|
|
|
+ /* 计算出定时的时间点数目 */
|
|
|
|
+ long ticks = timeout_ms / m_interval_ms;
|
|
|
|
+ if(ticks > m_wheel_size) {
|
|
|
|
+ FMTLOG_WARN("timeout is too long");
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ Task task;
|
|
|
|
+ task.is_loop = is_loop;
|
|
|
|
+ task.interval = timeout_ms;
|
|
|
|
+ task.func = func;
|
|
|
|
+ task.timerPoint.firstLevelIndex = 0;
|
|
|
|
+ task.timerPoint.secondLevelIndex = 0;
|
|
|
|
+ task.timerPoint.thirdLevelIndex = 0;
|
|
|
|
+ /* 加上当前时间点的位置偏移,计算出需要插入的时间点 */
|
|
|
|
+ // long index = m_currentFirstIndex + m_currentSecondIndex * m_firstLevelCount + m_currentThirdIndex * m_secondLevelCount * m_firstLevelCount + ticks;
|
|
|
|
+ long index = m_currentIndex + ticks;
|
|
|
|
+ /* 计算出需要的位置,如果超出了最大范围就放到过去的时间点中 */
|
|
|
|
+ index = index % m_wheel_size;
|
|
|
|
+ /* 计算出时间点的位置 */
|
|
|
|
+ task.timerPoint.firstLevelIndex = index % m_firstLevelCount;
|
|
|
|
+ task.timerPoint.secondLevelIndex = (index / m_firstLevelCount) % m_secondLevelCount;
|
|
|
|
+ task.timerPoint.thirdLevelIndex = (index / m_firstLevelCount / m_secondLevelCount) % m_thirdLevelCount;
|
|
|
|
+ /* 加入到时间轮的任务队列中 */
|
|
|
|
+ if(task.timerPoint.thirdLevelIndex != m_currentThirdIndex)
|
|
|
|
+ {
|
|
|
|
+ /* 不在当前正在轮训的第三层时间点内,直接加入到对应的时间点中即可 */
|
|
|
|
+ m_thirdLevelWheel[task.timerPoint.thirdLevelIndex].push_back(task);
|
|
|
|
+ }
|
|
|
|
+ /* 在当前正在轮训的第三层时间点内,继续判断是否已经是过去的时间点 */
|
|
|
|
+ else
|
|
|
|
+ {
|
|
|
|
+ if(task.timerPoint.secondLevelIndex < m_currentSecondIndex ||
|
|
|
|
+ (task.timerPoint.secondLevelIndex == m_currentSecondIndex && task.timerPoint.firstLevelIndex < m_currentFirstIndex))
|
|
|
|
+ {
|
|
|
|
+ /* 在当前正在轮训的第三层时间点内,但是已经是过去的时间点,加入到当前第三层的队列中 */
|
|
|
|
+ m_thirdLevelWheel[task.timerPoint.thirdLevelIndex].push_back(task);
|
|
|
|
+ }
|
|
|
|
+ /* 在当前第三层还未执行的时间点内 */
|
|
|
|
+ else if(task.timerPoint.secondLevelIndex > m_currentFirstIndex)
|
|
|
|
+ {
|
|
|
|
+ /* 在第二轮未执行的点中,加入到第二轮队列中 */
|
|
|
|
+ m_secondLevelWheel[task.timerPoint.secondLevelIndex].push_back(task);
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ {
|
|
|
|
+ /* 在当前正在轮训的第二层时间点内,且是未来的时间点,加入到对应的第一层时间点中 */
|
|
|
|
+ m_firstLevelWheel[task.timerPoint.firstLevelIndex].push_back(task);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+private:
|
|
|
|
+ /* 时间片 */
|
|
|
|
+ void Tick()
|
|
|
|
+ {
|
|
|
|
+ m_mutex.lock();
|
|
|
|
+ /* 取出这个时间点的函数,循环执行 */
|
|
|
|
+ auto listTask = std::move(m_firstLevelWheel[m_currentFirstIndex]);
|
|
|
|
+ m_firstLevelWheel[m_currentFirstIndex].clear();
|
|
|
|
+ m_mutex.unlock();
|
|
|
|
+ /* 将任务加入到线程池中执行 */
|
|
|
|
+ for (const auto& task : listTask) {
|
|
|
|
+ CPPTP.add_task(task.func);
|
|
|
|
+ }
|
|
|
|
+ /* 将时间点向前 + 1 */
|
|
|
|
+ m_mutex.lock();
|
|
|
|
+ m_currentFirstIndex = (m_currentFirstIndex + 1) % m_firstLevelCount;
|
|
|
|
+ /* 判断第一层时间轮是否走完了 */
|
|
|
|
+ if(m_currentFirstIndex == 0)
|
|
|
|
+ {
|
|
|
|
+ /* 第一个轮走完,取出第二层的下一个时间点 */
|
|
|
|
+ m_currentSecondIndex = (m_currentSecondIndex + 1) % m_secondLevelCount;
|
|
|
|
+ if(m_currentSecondIndex == 0)
|
|
|
|
+ {
|
|
|
|
+ /* 第二层时间轮也走完了,就取出第三个时间轮的下一个时间点 */
|
|
|
|
+ m_currentThirdIndex = (m_currentThirdIndex + 1) % m_thirdLevelCount;
|
|
|
|
+ /* 清空第二层时间轮 */
|
|
|
|
+ m_secondLevelWheel.clear();
|
|
|
|
+ for (auto& task : m_thirdLevelWheel[m_currentThirdIndex])
|
|
|
|
+ {
|
|
|
|
+ m_secondLevelWheel[task.timerPoint.secondLevelIndex].push_back(task);
|
|
|
|
+ }
|
|
|
|
+ m_thirdLevelWheel[m_currentThirdIndex].clear();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /* 清空第一层时间轮 */
|
|
|
|
+ m_firstLevelWheel.clear();
|
|
|
|
+ for (auto& task : m_secondLevelWheel[m_currentSecondIndex])
|
|
|
|
+ {
|
|
|
|
+ m_firstLevelWheel[task.timerPoint.firstLevelIndex].push_back(task);
|
|
|
|
+ }
|
|
|
|
+ m_secondLevelWheel[m_currentSecondIndex].clear();
|
|
|
|
+ }
|
|
|
|
+ m_mutex.unlock();
|
|
|
|
+ /* 将循环定时的任务重新加入到时间轮中 */
|
|
|
|
+ for(auto& task : listTask)
|
|
|
|
+ {
|
|
|
|
+ if(task.is_loop)
|
|
|
|
+ {
|
|
|
|
+ AddTask(task.interval, task.func, true);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ m_currentIndex = (m_currentIndex + 1) % m_wheel_size;
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+private:
|
|
|
|
+ long m_max_interval = 0; /* 最大的定时时间长度,等于下面两个之和,超过就报错 */
|
|
|
|
+ long m_wheel_size; /* 时间轮最大的轮训次数 */
|
|
|
|
+ long m_interval_ms; /* 每个时间点的间隔秒数,这里是1ms */
|
|
|
|
+ const long m_tick_us = 100; /* 管理线程跳动时间是100us */
|
|
|
|
+ const long m_firstLevelCount = 1000; /* 第一层级的时间轮的个数 */
|
|
|
|
+ const long m_secondLevelCount = 60; /* 第二层级的时间轮的个数 */
|
|
|
|
+ const long m_thirdLevelCount = 60; /* 第三层级的时间轮的个数 */
|
|
|
|
+ std::vector<std::list<Task>> m_firstLevelWheel; /* 第一层级的时间轮,这里是ms等级,总共1000个 */
|
|
|
|
+ std::vector<std::list<Task>> m_secondLevelWheel; /* 第二层级的时间轮,这里是秒的等级,总共60个 */
|
|
|
|
+ std::vector<std::list<Task>> m_thirdLevelWheel; /* 第三层级的时间轮,这里是分钟的等级,总共60个 */
|
|
|
|
+ long m_currentFirstIndex; /* 现在的第一层时间点 */
|
|
|
|
+ long m_currentSecondIndex; /* 现在的第二层位置 */
|
|
|
|
+ long m_currentThirdIndex; /* 现在的第三层位置 */
|
|
|
|
+ long m_currentIndex; /* 现在的时间点 */
|
|
|
|
+ bool m_running = false; /* 运行标志位 */
|
|
|
|
+ std::thread m_thread;
|
|
|
|
+ std::mutex m_mutex;
|
|
|
|
+
|
|
|
|
+};
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|