123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- struct TaskTimerPoint
- {
- long firstLevelIndex = 0;
- long secondLevelIndex = 0;
- long thirdLevelIndex = 0;
- TaskTimerPoint() = default;
- TaskTimerPoint& operator=(const TaskTimerPoint& other) {
- firstLevelIndex = other.firstLevelIndex;
- secondLevelIndex = other.secondLevelIndex;
- thirdLevelIndex = other.thirdLevelIndex;
- return *this;
- }
- };
- using TaskFunc = std::function<void()>;
- struct Task {
- bool is_loop;
- long interval;
- TaskFunc func;
- TaskTimerPoint timerPoint;
- };
- class TimerWheel {
- public:
-
-
- explicit TimerWheel()
- : m_currentFirstIndex(0),
- m_currentSecondIndex(0),
- m_currentThirdIndex(0),
- m_currentIndex(0)
- {
- m_wheel_size = 1000 * 60 * 60;
- m_interval_ms = 1;
- m_firstLevelWheel.resize(1000);
- m_secondLevelWheel.resize(60);
- m_thirdLevelWheel.resize(60);
- }
- ~TimerWheel() {
- Stop();
- }
-
- void Start()
- {
- if (m_running) {
- return;
- }
- m_running = true;
-
- m_thread = std::thread([this]() {
- std::chrono::steady_clock::time_point lastTime = std::chrono::steady_clock::now();
- while (m_running) {
-
- std::this_thread::sleep_for(std::chrono::microseconds(m_tick_us));
- std::chrono::steady_clock::time_point nowTime = std::chrono::steady_clock::now();
- auto duration = std::chrono::duration_cast<std::chrono::microseconds>(nowTime - lastTime);
- if(duration.count() >= 900) {
- Tick();
- lastTime = nowTime;
- }
-
- }
- FMTLOG_INFO("TimerWheel thread exit");
- });
-
- }
- void Stop()
- {
- if (!m_running) {
- return;
- }
- m_running = false;
- if (m_thread.joinable()) {
- m_thread.join();
- }
- }
-
- template<typename F, typename... Args>
- std::function<void()> bindTask(int timeout_ms, F&& f, Args&&... args)
- {
- return std::bind(std::forward<F>(f), std::forward<Args>(args)...);
- }
-
-
- void AddTask(long timeout_ms, std::function<void()> func, bool is_loop = false)
- {
-
- std::lock_guard<std::mutex> lock(m_mutex);
-
- long ticks = timeout_ms / m_interval_ms;
- if(ticks > m_wheel_size) {
- FMTLOG_WARN("timeout is too long");
- return;
- }
- Task task;
- task.is_loop = is_loop;
- task.interval = timeout_ms;
- task.func = func;
- task.timerPoint.firstLevelIndex = 0;
- task.timerPoint.secondLevelIndex = 0;
- task.timerPoint.thirdLevelIndex = 0;
-
-
- long index = m_currentIndex + ticks;
-
- index = index % m_wheel_size;
-
- task.timerPoint.firstLevelIndex = index % m_firstLevelCount;
- task.timerPoint.secondLevelIndex = (index / m_firstLevelCount) % m_secondLevelCount;
- task.timerPoint.thirdLevelIndex = (index / m_firstLevelCount / m_secondLevelCount) % m_thirdLevelCount;
-
- if(task.timerPoint.thirdLevelIndex != m_currentThirdIndex)
- {
-
- m_thirdLevelWheel[task.timerPoint.thirdLevelIndex].push_back(task);
- }
-
- else
- {
- if(task.timerPoint.secondLevelIndex < m_currentSecondIndex ||
- (task.timerPoint.secondLevelIndex == m_currentSecondIndex && task.timerPoint.firstLevelIndex < m_currentFirstIndex))
- {
-
- m_thirdLevelWheel[task.timerPoint.thirdLevelIndex].push_back(task);
- }
-
- else if(task.timerPoint.secondLevelIndex > m_currentFirstIndex)
- {
-
- m_secondLevelWheel[task.timerPoint.secondLevelIndex].push_back(task);
- }
- else
- {
-
- m_firstLevelWheel[task.timerPoint.firstLevelIndex].push_back(task);
- }
- }
-
- }
- private:
-
- void Tick()
- {
- m_mutex.lock();
-
- auto listTask = std::move(m_firstLevelWheel[m_currentFirstIndex]);
- m_firstLevelWheel[m_currentFirstIndex].clear();
- m_mutex.unlock();
-
- for (const auto& task : listTask) {
- CPPTP.add_task(task.func);
- }
-
- m_mutex.lock();
- m_currentFirstIndex = (m_currentFirstIndex + 1) % m_firstLevelCount;
-
- if(m_currentFirstIndex == 0)
- {
-
- m_currentSecondIndex = (m_currentSecondIndex + 1) % m_secondLevelCount;
- if(m_currentSecondIndex == 0)
- {
-
- m_currentThirdIndex = (m_currentThirdIndex + 1) % m_thirdLevelCount;
-
- m_secondLevelWheel.clear();
- for (auto& task : m_thirdLevelWheel[m_currentThirdIndex])
- {
- m_secondLevelWheel[task.timerPoint.secondLevelIndex].push_back(task);
- }
- m_thirdLevelWheel[m_currentThirdIndex].clear();
- }
-
- m_firstLevelWheel.clear();
- for (auto& task : m_secondLevelWheel[m_currentSecondIndex])
- {
- m_firstLevelWheel[task.timerPoint.firstLevelIndex].push_back(task);
- }
- m_secondLevelWheel[m_currentSecondIndex].clear();
- }
- m_mutex.unlock();
-
- for(auto& task : listTask)
- {
- if(task.is_loop)
- {
- AddTask(task.interval, task.func, true);
- }
- }
- m_currentIndex = (m_currentIndex + 1) % m_wheel_size;
-
- }
- private:
- long m_max_interval = 0;
- long m_wheel_size;
- long m_interval_ms;
- const long m_tick_us = 100;
- const long m_firstLevelCount = 1000;
- const long m_secondLevelCount = 60;
- const long m_thirdLevelCount = 60;
- std::vector<std::list<Task>> m_firstLevelWheel;
- std::vector<std::list<Task>> m_secondLevelWheel;
- std::vector<std::list<Task>> m_thirdLevelWheel;
- long m_currentFirstIndex;
- long m_currentSecondIndex;
- long m_currentThirdIndex;
- long m_currentIndex;
- bool m_running = false;
- std::thread m_thread;
- std::mutex m_mutex;
-
- };
|