#include #include #include #include #include #include #include "ThreadPool/ThreadPool.h" #include "FmtLog/fmtlog.h" /** * @brief 这是一个时间轮定时器的实现,将任务添加到时间轮中,时间轮会在指定的时间点执行任务 * 这个定时器的最大定时时间是1小时,最小时间间隔是1ms * * * 设计说明: * 1、时间轮的时间点是固定的,每个时间点的时间间隔是1ms,最大时间点是1000 * 60 * 60个,也就是1小时 * 2、任务是挂在一个个时间点上的,同一个时间点可以有多个任务 * 3、时间轮的执行是在一个新的线程中执行的,每隔1ms检测一次是否有任务需要执行 * 4、当前时间点是下一个将要执行的时间点,新加入的立即执行的任务就会加入到当前时间点上 * 5、时间轮的时间点是循环的,当到达最大时间点时,会重新从0开始 * 6、管理线程是100us运行一次,在运行时会进行时间累积,相差1ms就会执行一次任务 * * 缺点: * 1、时间轮的时间点是固定的,如果任务的时间点不在时间轮的时间点上,就会被延迟执行 */ /** ================================================================== * ************************** 任务结构体 ****************************** * ================================================================== */ /** * 任务时间点结构体,用来标识任务在时间轮中的位置 */ struct TaskTimerPoint { long firstLevelIndex = 0; long secondLevelIndex = 0; long thirdLevelIndex = 0; TaskTimerPoint() = default; TaskTimerPoint& operator=(const TaskTimerPoint& other) { firstLevelIndex = other.firstLevelIndex; secondLevelIndex = other.secondLevelIndex; thirdLevelIndex = other.thirdLevelIndex; return *this; } }; /* 定义函数指针 */ using TaskFunc = std::function; /* 定义任务结构体 */ struct Task { bool is_loop; /* 是否循环定时 */ long interval; /* 定时间隔 */ TaskFunc func; /* 任务函数 */ TaskTimerPoint timerPoint; /* 任务时间点,在时间轮中的位置 */ }; /** ================================================================== * ***************************** 任务类 ****************************** * ================================================================== */ class TimerWheel { public: /* 构造函数,第一个参数是任务时间点容器最大数量,第二个参数是最低检测时间单位 */ explicit TimerWheel() : m_currentFirstIndex(0), m_currentSecondIndex(0), m_currentThirdIndex(0), m_currentIndex(0) { m_wheel_size = 1000 * 60 * 60; /* 1小时定时间隔 */ m_interval_ms = 1; /* 1ms */ m_firstLevelWheel.resize(1000); m_secondLevelWheel.resize(60); m_thirdLevelWheel.resize(60); } ~TimerWheel() { Stop(); } /* 开启时间轮 */ void Start() { if (m_running) { return; } m_running = true; /* 开启新线程,定时检测队列 */ m_thread = std::thread([this]() { std::chrono::steady_clock::time_point lastTime = std::chrono::steady_clock::now(); while (m_running) { /* 100us计算一次距离上次执行的间隔时间,超过1ms就执行一次 */ std::this_thread::sleep_for(std::chrono::microseconds(m_tick_us)); std::chrono::steady_clock::time_point nowTime = std::chrono::steady_clock::now(); auto duration = std::chrono::duration_cast(nowTime - lastTime); if(duration.count() >= 900) { Tick(); lastTime = nowTime; } } FMTLOG_INFO("TimerWheel thread exit"); }); } void Stop() { if (!m_running) { return; } m_running = false; if (m_thread.joinable()) { m_thread.join(); } } /* 绑定任务函数和参数 */ template std::function bindTask(int timeout_ms, F&& f, Args&&... args) { return std::bind(std::forward(f), std::forward(args)...); } /* 添加任务函数 */ void AddTask(long timeout_ms, std::function func, bool is_loop = false) { /* 这里需要将当前时间点一起上锁,防止立即执行的任务会错过 */ std::lock_guard lock(m_mutex); /* 计算出定时的时间点数目 */ long ticks = timeout_ms / m_interval_ms; if(ticks > m_wheel_size) { FMTLOG_WARN("timeout is too long"); return; } Task task; task.is_loop = is_loop; task.interval = timeout_ms; task.func = func; task.timerPoint.firstLevelIndex = 0; task.timerPoint.secondLevelIndex = 0; task.timerPoint.thirdLevelIndex = 0; /* 加上当前时间点的位置偏移,计算出需要插入的时间点 */ // long index = m_currentFirstIndex + m_currentSecondIndex * m_firstLevelCount + m_currentThirdIndex * m_secondLevelCount * m_firstLevelCount + ticks; long index = m_currentIndex + ticks; /* 计算出需要的位置,如果超出了最大范围就放到过去的时间点中 */ index = index % m_wheel_size; /* 计算出时间点的位置 */ task.timerPoint.firstLevelIndex = index % m_firstLevelCount; task.timerPoint.secondLevelIndex = (index / m_firstLevelCount) % m_secondLevelCount; task.timerPoint.thirdLevelIndex = (index / m_firstLevelCount / m_secondLevelCount) % m_thirdLevelCount; /* 加入到时间轮的任务队列中 */ if(task.timerPoint.thirdLevelIndex != m_currentThirdIndex) { /* 不在当前正在轮训的第三层时间点内,直接加入到对应的时间点中即可 */ m_thirdLevelWheel[task.timerPoint.thirdLevelIndex].push_back(task); } /* 在当前正在轮训的第三层时间点内,继续判断是否已经是过去的时间点 */ else { if(task.timerPoint.secondLevelIndex < m_currentSecondIndex || (task.timerPoint.secondLevelIndex == m_currentSecondIndex && task.timerPoint.firstLevelIndex < m_currentFirstIndex)) { /* 在当前正在轮训的第三层时间点内,但是已经是过去的时间点,加入到当前第三层的队列中 */ m_thirdLevelWheel[task.timerPoint.thirdLevelIndex].push_back(task); } /* 在当前第三层还未执行的时间点内 */ else if(task.timerPoint.secondLevelIndex > m_currentFirstIndex) { /* 在第二轮未执行的点中,加入到第二轮队列中 */ m_secondLevelWheel[task.timerPoint.secondLevelIndex].push_back(task); } else { /* 在当前正在轮训的第二层时间点内,且是未来的时间点,加入到对应的第一层时间点中 */ m_firstLevelWheel[task.timerPoint.firstLevelIndex].push_back(task); } } } private: /* 时间片 */ void Tick() { m_mutex.lock(); /* 取出这个时间点的函数,循环执行 */ auto listTask = std::move(m_firstLevelWheel[m_currentFirstIndex]); m_firstLevelWheel[m_currentFirstIndex].clear(); m_mutex.unlock(); /* 将任务加入到线程池中执行 */ for (const auto& task : listTask) { CPPTP.add_task(task.func); } /* 将时间点向前 + 1 */ m_mutex.lock(); m_currentFirstIndex = (m_currentFirstIndex + 1) % m_firstLevelCount; /* 判断第一层时间轮是否走完了 */ if(m_currentFirstIndex == 0) { /* 第一个轮走完,取出第二层的下一个时间点 */ m_currentSecondIndex = (m_currentSecondIndex + 1) % m_secondLevelCount; if(m_currentSecondIndex == 0) { /* 第二层时间轮也走完了,就取出第三个时间轮的下一个时间点 */ m_currentThirdIndex = (m_currentThirdIndex + 1) % m_thirdLevelCount; /* 清空第二层时间轮 */ m_secondLevelWheel.clear(); for (auto& task : m_thirdLevelWheel[m_currentThirdIndex]) { m_secondLevelWheel[task.timerPoint.secondLevelIndex].push_back(task); } m_thirdLevelWheel[m_currentThirdIndex].clear(); } /* 清空第一层时间轮 */ m_firstLevelWheel.clear(); for (auto& task : m_secondLevelWheel[m_currentSecondIndex]) { m_firstLevelWheel[task.timerPoint.firstLevelIndex].push_back(task); } m_secondLevelWheel[m_currentSecondIndex].clear(); } m_mutex.unlock(); /* 将循环定时的任务重新加入到时间轮中 */ for(auto& task : listTask) { if(task.is_loop) { AddTask(task.interval, task.func, true); } } m_currentIndex = (m_currentIndex + 1) % m_wheel_size; } private: long m_max_interval = 0; /* 最大的定时时间长度,等于下面两个之和,超过就报错 */ long m_wheel_size; /* 时间轮最大的轮训次数 */ long m_interval_ms; /* 每个时间点的间隔秒数,这里是1ms */ const long m_tick_us = 100; /* 管理线程跳动时间是100us */ const long m_firstLevelCount = 1000; /* 第一层级的时间轮的个数 */ const long m_secondLevelCount = 60; /* 第二层级的时间轮的个数 */ const long m_thirdLevelCount = 60; /* 第三层级的时间轮的个数 */ std::vector> m_firstLevelWheel; /* 第一层级的时间轮,这里是ms等级,总共1000个 */ std::vector> m_secondLevelWheel; /* 第二层级的时间轮,这里是秒的等级,总共60个 */ std::vector> m_thirdLevelWheel; /* 第三层级的时间轮,这里是分钟的等级,总共60个 */ long m_currentFirstIndex; /* 现在的第一层时间点 */ long m_currentSecondIndex; /* 现在的第二层位置 */ long m_currentThirdIndex; /* 现在的第三层位置 */ long m_currentIndex; /* 现在的时间点 */ bool m_running = false; /* 运行标志位 */ std::thread m_thread; std::mutex m_mutex; };