123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- #include <chrono>
- #include <functional>
- #include <list>
- #include <mutex>
- #include <thread>
- #include <vector>
- #include "ThreadPool/ThreadPool.h"
- #include "FmtLog/fmtlog.h"
- /**
- * @brief 这是一个时间轮定时器的实现,将任务添加到时间轮中,时间轮会在指定的时间点执行任务
- * 这个定时器的最大定时时间是1小时,最小时间间隔是1ms
- *
- *
- * 设计说明:
- * 1、时间轮的时间点是固定的,每个时间点的时间间隔是1ms,最大时间点是1000 * 60 * 60个,也就是1小时
- * 2、任务是挂在一个个时间点上的,同一个时间点可以有多个任务
- * 3、时间轮的执行是在一个新的线程中执行的,每隔1ms检测一次是否有任务需要执行
- * 4、当前时间点是下一个将要执行的时间点,新加入的立即执行的任务就会加入到当前时间点上
- * 5、时间轮的时间点是循环的,当到达最大时间点时,会重新从0开始
- * 6、管理线程是100us运行一次,在运行时会进行时间累积,相差1ms就会执行一次任务
- *
- * 缺点:
- * 1、时间轮的时间点是固定的,如果任务的时间点不在时间轮的时间点上,就会被延迟执行
- */
- /** ==================================================================
- * ************************** 任务结构体 ******************************
- * ================================================================== */
- /**
- * 任务时间点结构体,用来标识任务在时间轮中的位置
- */
- struct TaskTimerPoint
- {
- long firstLevelIndex = 0;
- long secondLevelIndex = 0;
- long thirdLevelIndex = 0;
- TaskTimerPoint() = default;
- TaskTimerPoint& operator=(const TaskTimerPoint& other) {
- firstLevelIndex = other.firstLevelIndex;
- secondLevelIndex = other.secondLevelIndex;
- thirdLevelIndex = other.thirdLevelIndex;
- return *this;
- }
- };
- /* 定义函数指针 */
- using TaskFunc = std::function<void()>;
- /* 定义任务结构体 */
- struct Task {
- bool is_loop; /* 是否循环定时 */
- long interval; /* 定时间隔 */
- TaskFunc func; /* 任务函数 */
- TaskTimerPoint timerPoint; /* 任务时间点,在时间轮中的位置 */
- };
- /** ==================================================================
- * ***************************** 任务类 ******************************
- * ================================================================== */
- class TimerWheel {
- public:
-
- /* 构造函数,第一个参数是任务时间点容器最大数量,第二个参数是最低检测时间单位 */
- explicit TimerWheel()
- : m_currentFirstIndex(0),
- m_currentSecondIndex(0),
- m_currentThirdIndex(0),
- m_currentIndex(0)
- {
- m_wheel_size = 1000 * 60 * 60; /* 1小时定时间隔 */
- m_interval_ms = 1; /* 1ms */
- m_firstLevelWheel.resize(1000);
- m_secondLevelWheel.resize(60);
- m_thirdLevelWheel.resize(60);
- }
- ~TimerWheel() {
- Stop();
- }
- /* 开启时间轮 */
- void Start()
- {
- if (m_running) {
- return;
- }
- m_running = true;
- /* 开启新线程,定时检测队列 */
- m_thread = std::thread([this]() {
- std::chrono::steady_clock::time_point lastTime = std::chrono::steady_clock::now();
- while (m_running) {
- /* 100us计算一次距离上次执行的间隔时间,超过1ms就执行一次 */
- std::this_thread::sleep_for(std::chrono::microseconds(m_tick_us));
- std::chrono::steady_clock::time_point nowTime = std::chrono::steady_clock::now();
- auto duration = std::chrono::duration_cast<std::chrono::microseconds>(nowTime - lastTime);
- if(duration.count() >= 900) {
- Tick();
- lastTime = nowTime;
- }
-
- }
- FMTLOG_INFO("TimerWheel thread exit");
- });
-
- }
- void Stop()
- {
- if (!m_running) {
- return;
- }
- m_running = false;
- if (m_thread.joinable()) {
- m_thread.join();
- }
- }
- /* 绑定任务函数和参数 */
- template<typename F, typename... Args>
- std::function<void()> bindTask(int timeout_ms, F&& f, Args&&... args)
- {
- return std::bind(std::forward<F>(f), std::forward<Args>(args)...);
- }
-
- /* 添加任务函数 */
- void AddTask(long timeout_ms, std::function<void()> func, bool is_loop = false)
- {
- /* 这里需要将当前时间点一起上锁,防止立即执行的任务会错过 */
- std::lock_guard<std::mutex> lock(m_mutex);
- /* 计算出定时的时间点数目 */
- long ticks = timeout_ms / m_interval_ms;
- if(ticks > m_wheel_size) {
- FMTLOG_WARN("timeout is too long");
- return;
- }
- Task task;
- task.is_loop = is_loop;
- task.interval = timeout_ms;
- task.func = func;
- task.timerPoint.firstLevelIndex = 0;
- task.timerPoint.secondLevelIndex = 0;
- task.timerPoint.thirdLevelIndex = 0;
- /* 加上当前时间点的位置偏移,计算出需要插入的时间点 */
- // long index = m_currentFirstIndex + m_currentSecondIndex * m_firstLevelCount + m_currentThirdIndex * m_secondLevelCount * m_firstLevelCount + ticks;
- long index = m_currentIndex + ticks;
- /* 计算出需要的位置,如果超出了最大范围就放到过去的时间点中 */
- index = index % m_wheel_size;
- /* 计算出时间点的位置 */
- task.timerPoint.firstLevelIndex = index % m_firstLevelCount;
- task.timerPoint.secondLevelIndex = (index / m_firstLevelCount) % m_secondLevelCount;
- task.timerPoint.thirdLevelIndex = (index / m_firstLevelCount / m_secondLevelCount) % m_thirdLevelCount;
- /* 加入到时间轮的任务队列中 */
- if(task.timerPoint.thirdLevelIndex != m_currentThirdIndex)
- {
- /* 不在当前正在轮训的第三层时间点内,直接加入到对应的时间点中即可 */
- m_thirdLevelWheel[task.timerPoint.thirdLevelIndex].push_back(task);
- }
- /* 在当前正在轮训的第三层时间点内,继续判断是否已经是过去的时间点 */
- else
- {
- if(task.timerPoint.secondLevelIndex < m_currentSecondIndex ||
- (task.timerPoint.secondLevelIndex == m_currentSecondIndex && task.timerPoint.firstLevelIndex < m_currentFirstIndex))
- {
- /* 在当前正在轮训的第三层时间点内,但是已经是过去的时间点,加入到当前第三层的队列中 */
- m_thirdLevelWheel[task.timerPoint.thirdLevelIndex].push_back(task);
- }
- /* 在当前第三层还未执行的时间点内 */
- else if(task.timerPoint.secondLevelIndex > m_currentFirstIndex)
- {
- /* 在第二轮未执行的点中,加入到第二轮队列中 */
- m_secondLevelWheel[task.timerPoint.secondLevelIndex].push_back(task);
- }
- else
- {
- /* 在当前正在轮训的第二层时间点内,且是未来的时间点,加入到对应的第一层时间点中 */
- m_firstLevelWheel[task.timerPoint.firstLevelIndex].push_back(task);
- }
- }
-
- }
- private:
- /* 时间片 */
- void Tick()
- {
- m_mutex.lock();
- /* 取出这个时间点的函数,循环执行 */
- auto listTask = std::move(m_firstLevelWheel[m_currentFirstIndex]);
- m_firstLevelWheel[m_currentFirstIndex].clear();
- m_mutex.unlock();
- /* 将任务加入到线程池中执行 */
- for (const auto& task : listTask) {
- CPPTP.add_task(task.func);
- }
- /* 将时间点向前 + 1 */
- m_mutex.lock();
- m_currentFirstIndex = (m_currentFirstIndex + 1) % m_firstLevelCount;
- /* 判断第一层时间轮是否走完了 */
- if(m_currentFirstIndex == 0)
- {
- /* 第一个轮走完,取出第二层的下一个时间点 */
- m_currentSecondIndex = (m_currentSecondIndex + 1) % m_secondLevelCount;
- if(m_currentSecondIndex == 0)
- {
- /* 第二层时间轮也走完了,就取出第三个时间轮的下一个时间点 */
- m_currentThirdIndex = (m_currentThirdIndex + 1) % m_thirdLevelCount;
- /* 清空第二层时间轮 */
- m_secondLevelWheel.clear();
- for (auto& task : m_thirdLevelWheel[m_currentThirdIndex])
- {
- m_secondLevelWheel[task.timerPoint.secondLevelIndex].push_back(task);
- }
- m_thirdLevelWheel[m_currentThirdIndex].clear();
- }
- /* 清空第一层时间轮 */
- m_firstLevelWheel.clear();
- for (auto& task : m_secondLevelWheel[m_currentSecondIndex])
- {
- m_firstLevelWheel[task.timerPoint.firstLevelIndex].push_back(task);
- }
- m_secondLevelWheel[m_currentSecondIndex].clear();
- }
- m_mutex.unlock();
- /* 将循环定时的任务重新加入到时间轮中 */
- for(auto& task : listTask)
- {
- if(task.is_loop)
- {
- AddTask(task.interval, task.func, true);
- }
- }
- m_currentIndex = (m_currentIndex + 1) % m_wheel_size;
-
- }
- private:
- long m_max_interval = 0; /* 最大的定时时间长度,等于下面两个之和,超过就报错 */
- long m_wheel_size; /* 时间轮最大的轮训次数 */
- long m_interval_ms; /* 每个时间点的间隔秒数,这里是1ms */
- const long m_tick_us = 100; /* 管理线程跳动时间是100us */
- const long m_firstLevelCount = 1000; /* 第一层级的时间轮的个数 */
- const long m_secondLevelCount = 60; /* 第二层级的时间轮的个数 */
- const long m_thirdLevelCount = 60; /* 第三层级的时间轮的个数 */
- std::vector<std::list<Task>> m_firstLevelWheel; /* 第一层级的时间轮,这里是ms等级,总共1000个 */
- std::vector<std::list<Task>> m_secondLevelWheel; /* 第二层级的时间轮,这里是秒的等级,总共60个 */
- std::vector<std::list<Task>> m_thirdLevelWheel; /* 第三层级的时间轮,这里是分钟的等级,总共60个 */
- long m_currentFirstIndex; /* 现在的第一层时间点 */
- long m_currentSecondIndex; /* 现在的第二层位置 */
- long m_currentThirdIndex; /* 现在的第三层位置 */
- long m_currentIndex; /* 现在的时间点 */
- bool m_running = false; /* 运行标志位 */
- std::thread m_thread;
- std::mutex m_mutex;
-
- };
|