Browse Source

V0.2.21
1、开始修改时间轮

Apple 5 months ago
parent
commit
9488874a0f
4 changed files with 381 additions and 103 deletions
  1. 132 0
      common/Timer/TWTimer.h
  2. 195 0
      common/Timer/tw.cpp
  3. 54 0
      common/Timer/tw.h
  4. 0 103
      common/TimerWheel/TimerWheel.h

+ 132 - 0
common/Timer/TWTimer.h

@@ -0,0 +1,132 @@
+
+#include <chrono>
+#include <functional>
+#include <list>
+#include <mutex>
+#include <thread>
+#include <vector>
+#include <iostream>
+
+#include "ThreadPool.h"
+#include "spdlog/spdlog.h"
+#include "RingQueue.hpp"
+
+/**
+ * @brief 这是一个时间轮定时器的实现,将任务添加到时间轮中,时间轮会在指定的时间点执行任务
+ *        任务是一个函数,可以是lambda表达式,也可以是std::bind绑定的函数
+ *        任务是挂在一个个时间点上的,同一个时间点可以有多个任务,时间点的最大数目就是任务容器最大容量
+ *        间隔时间是1ms,最大定时时间是1000 * 60 * 60 ms,也就是1小时
+ * 
+ * 缺点:
+ *        1、时间轮的时间点是固定的,如果任务的时间点不在时间轮的时间点上,就会被延迟执行
+ *        2、任务执行是在这个线程中执行的,如果任务执行时间过长,会影响时间轮的执行
+ */
+
+
+/** ==================================================================
+ *
+ * ================================================================== */
+
+/* 定义函数指针 */
+using TaskFunc = std::function<void()>;
+/* 定义任务结构体 */
+struct Task {
+    bool is_loop;           /* 是否循环定时 */
+    TaskFunc func;          /* 任务函数 */
+    long interval;          /* 定时间隔 */
+};
+
+class TimerWheel {
+public:
+    
+    /* 构造函数,第一个参数是任务时间点容器最大数量,第二个参数是最低检测时间单位 */
+    explicit TimerWheel()
+        : m_current_index(0) {
+        m_wheel_size = 1000 * 60 * 60;      /* 1小时 */
+        m_interval_ms = 1000;               /* 1ms */
+        m_firstLevelWheel.resize(1000);
+        m_secondLevelWheel.resize(60);
+        m_thirdLevelWheel.resize(60);
+    }
+
+    ~TimerWheel() {
+        Stop();
+    }
+    /* 开启时间轮 */
+    void Start() {
+        if (m_running) {
+            return;
+        }
+        m_running = true;
+        /* 开启新线程,定时检测队列 */
+        m_thread = std::thread([this]() {
+            while (m_running) {
+                std::this_thread::sleep_for(std::chrono::milliseconds(m_interval_ms));
+                Tick();
+            }
+        std::cout << "timer oooops!" << std::endl;
+            });
+        m_thread.detach();
+    }
+
+    void Stop() {
+        if (!m_running) {
+            return;
+        }
+        m_running = false;
+        if (m_thread.joinable()) {
+            m_thread.join();
+        }
+    }
+    /* 添加任务函数 */
+    void AddTask(int timeout_ms, Task task, bool is_loop = false) {
+        std::lock_guard<std::mutex> lock(mutex_);
+        /* 计算出需要轮训的次数 */
+        size_t ticks = timeout_ms / m_interval_ms;
+        /* 在当前时间点上加上ticks */
+        size_t index = (m_current_index + ticks) % m_wheel_size;
+        /* 这里是设置在整个时间轮中,该任务在这个时间轮中的分布,以实现循环定时 */
+        size_t allindex = index;
+        for (size_t i = 1 ; allindex < m_wheel_size; i++)
+        {
+            allindex = index * i;
+            if (allindex >= m_wheel_size)
+                break;
+            wheel_[allindex].push_back(task);
+        }
+        
+    }
+
+private:
+    /* 时间片 */
+    void Tick() {
+        std::lock_guard<std::mutex> lock(mutex_);
+        /* 取出这个时间点的函数,循环执行 */
+        auto& tasks = wheel_[m_current_index];
+        for (const auto& task : tasks) {
+            task();
+        }
+        //tasks.clear();
+        /* 可以循环定时的关键,超过了设置的最大时间点,就会重置 */
+        m_current_index = (m_current_index + 1) % m_wheel_size;
+    }
+
+private:
+    long m_max_interval = 0;                /* 最大的定时时间长度,等于下面两个之和,超过就报错 */
+    size_t m_wheel_size;                    /* 时间轮最大的轮训次数 */
+    int m_interval_ms;                      /* 每个时间点的间隔秒数,这里是1ms */
+    long m_firstLevelCount;                 /* 第一层级的时间轮的个数 */
+    long m_secondLevelCount;                /* 第二层级的时间轮的个数 */
+    long m_thirdLevelCount;                 /* 第三层级的时间轮的个数 */
+    std::vector<std::list<Task>> m_firstLevelWheel;     /* 第一层级的时间轮,这里是ms等级,总共1000个 */
+    std::vector<std::list<Task>> m_secondLevelWheel;    /* 第二层级的时间轮,这里是秒的等级,总共60个 */
+    std::vector<std::list<Task>> m_thirdLevelWheel;     /* 第三层级的时间轮,这里是分钟的等级,总共60个 */
+    size_t m_current_index;                 /* 现在的时间点 */
+    bool m_running = false;                 /* 运行标识位 */
+    std::thread m_thread;
+    std::mutex mutex_;
+    
+};
+
+
+

+ 195 - 0
common/Timer/tw.cpp

@@ -0,0 +1,195 @@
+#include "tw.h"
+
+#include <iostream>
+#include <memory.h>
+#include <chrono>
+#include <thread>
+
+TimeWheel::TimeWheel() : m_steps(0), m_firstLevelCount(0), m_secondLevelCount(60), m_thirdLevelCount(0),
+                         m_increaseId (0){
+                            memset(&m_timePos, 0, sizeof(m_timePos));
+                         }
+
+void* TimeWheel::loopForInterval(void* arg)
+{
+    if(arg == NULL) {
+        printf("valid parameter\n");
+        return NULL;
+    }
+    TimeWheel* timeWheel = reinterpret_cast<TimeWheel*>(arg);
+    while(1) {
+       std::this_thread::sleep_for(std::chrono::milliseconds(timeWheel->m_steps));
+        // printf("wake up\n");
+        TimePos pos = {0};
+        TimePos m_lastTimePos = timeWheel->m_timePos;
+        //update slot of current TimeWheel
+        timeWheel->getTriggerTimeFromInterval(timeWheel->m_steps, pos);
+        timeWheel->m_timePos = pos;
+        {
+            std::unique_lock<std::mutex> lock(timeWheel->m_mutex);
+            // if minute changed, process in integral point (minute)
+            if (pos.pos_min != m_lastTimePos.pos_min)
+            {
+                // printf("minutes changed\n");
+                std::list<Event_t>* eventList = &timeWheel->m_eventSlotList[timeWheel->m_timePos.pos_min + timeWheel->m_firstLevelCount + timeWheel->m_secondLevelCount];
+                timeWheel->processEvent(*eventList);
+                eventList->clear();
+           }
+            else if (pos.pos_sec != m_lastTimePos.pos_sec)
+            {
+                //in same minute, but second changed, now is in this integral second
+                // printf("second changed\n");
+                std::list<Event_t>* eventList = &timeWheel->m_eventSlotList[timeWheel->m_timePos.pos_sec + timeWheel->m_firstLevelCount];
+                timeWheel->processEvent(*eventList);
+                eventList->clear();
+            }
+            else if (pos.pos_ms != m_lastTimePos.pos_ms)
+            {
+                //now in this ms
+                // printf("ms changed\n");
+                std::list<Event_t>* eventList = &timeWheel->m_eventSlotList[timeWheel->m_timePos.pos_ms];
+                timeWheel->processEvent(*eventList);
+                eventList->clear();
+            }
+            // printf("loop over\n");
+        }
+     
+    }
+
+    return nullptr;
+}
+
+//init TimeWheel's step and maxmin, which detemine the max period of this wheel
+void TimeWheel::initTimeWheel(int steps, int maxMin)
+{
+    if (1000 % steps != 0){
+        printf("invalid steps\n");
+        return;
+    }
+    m_steps = steps;
+    m_firstLevelCount = 1000/steps;
+    m_thirdLevelCount = maxMin;
+
+    m_eventSlotList.resize(m_firstLevelCount + m_secondLevelCount + m_thirdLevelCount);
+    int ret = pthread_create(&m_loopThread, NULL, loopForInterval, this);
+    if(ret != 0) {
+        printf("create thread error:%s\n", strerror(errno));
+        return;
+    }
+    // pthread_join(m_loopThread, NULL);
+}
+
+void TimeWheel::createTimingEvent(int interval, EventCallback_t callback){
+    if(interval < m_steps || interval % m_steps != 0 || interval >= m_steps*m_firstLevelCount*m_secondLevelCount*m_thirdLevelCount){
+        printf("invalid interval\n");
+        return;
+    }
+    printf("start create event\n");
+    Event_t event = {0};
+    event.interval = interval;
+    event.cb = callback;
+    //set time start
+    event.timePos.pos_min = m_timePos.pos_min;
+    event.timePos.pos_sec = m_timePos.pos_sec;
+    event.timePos.pos_ms = m_timePos.pos_ms;
+    event.id = createEventId();
+    // insert it to a slot of TimeWheel
+     std::unique_lock<std::mutex> lock(m_mutex);
+    insertEventToSlot(interval, event);
+    printf("create over\n");
+}
+
+
+int TimeWheel::createEventId() {
+    return m_increaseId++;  
+}
+
+
+void TimeWheel::getTriggerTimeFromInterval(int interval, TimePos_t &timePos) {
+    //get current time: ms
+    int curTime = getCurrentMs(m_timePos);
+    // printf("interval = %d,current ms = %d\n", interval, curTime);
+
+    //caculate which slot this interval should belong to 
+    int futureTime = curTime + interval;
+    // printf("future ms = %d\n", futureTime);
+    timePos.pos_min =  (futureTime/1000/60)%m_thirdLevelCount;
+    timePos.pos_sec =  (futureTime%(1000*60))/1000;
+    timePos.pos_ms = (futureTime%1000)/m_steps;
+
+    // printf("next minPos=%d, secPos=%d, msPos=%d\n", timePos.pos_min, timePos.pos_sec, timePos.pos_ms);
+    return;
+}
+
+int TimeWheel::getCurrentMs(TimePos_t timePos) {
+    return m_steps * timePos.pos_ms + timePos.pos_sec*1000 +  timePos.pos_min*60*1000;
+}
+
+int TimeWheel::processEvent(std::list<Event_t> &eventList){
+    // printf("eventList.size=%d\n", eventList.size());
+
+    //process the event for current slot
+    for(auto event = eventList.begin(); event != eventList.end(); event ++) {
+        //caculate the current ms
+        int currentMs = getCurrentMs(m_timePos);
+        //caculate last  time(ms) this event was processed
+        int lastProcessedMs = getCurrentMs(event->timePos);
+        //caculate the distance between now and last time(ms)
+        int distanceMs = (currentMs - lastProcessedMs + (m_secondLevelCount+1)*60*1000)%((m_secondLevelCount+1)*60*1000);
+
+        //if interval == distanceMs, need process this event
+        if (event->interval == distanceMs)
+        {
+            //process event
+            event->cb();
+            //get now pos as this event's start point
+            event->timePos = m_timePos;
+            //add this event to slot
+            insertEventToSlot(event->interval, *event);
+        }
+        else
+        {
+            //this condition will be trigger when process the integral point 
+            printf("event->interval != distanceMs\n");
+            // although this event in this positon, but it not arriving timing, it will continue move to next slot caculate by distance ms.
+            insertEventToSlot(distanceMs, *event);
+        }
+    }
+    return 0;
+}
+
+void TimeWheel::insertEventToSlot(int interval, Event_t& event)
+{
+    printf("insertEventToSlot\n");
+
+    TimePos_t timePos = {0};
+
+    //caculate the which slot this event should be set to
+    getTriggerTimeFromInterval(interval, timePos);
+     {
+        // printf("timePos.pos_min=%d, m_timePos.pos_min=%d\n", timePos.pos_min, m_timePos.pos_min);
+        // printf("timePos.pos_sec=%d, m_timePos.pos_sec=%d\n", timePos.pos_sec, m_timePos.pos_sec);
+        // printf("timePos.pos_ms=%d, m_timePos.pos_ms=%d\n", timePos.pos_ms, m_timePos.pos_ms);
+
+        // if minutes not equal to current minute, first insert it to it's minute slot
+        if (timePos.pos_min != m_timePos.pos_min)
+        {
+            printf("insert to %d minute\n", m_firstLevelCount + m_secondLevelCount + timePos.pos_min);
+                m_eventSlotList[m_firstLevelCount + m_secondLevelCount + timePos.pos_min]
+                    .push_back(event);
+        }
+        // if minutes is equal, but second changed, insert slot to this  integral point second
+        else if (timePos.pos_sec != m_timePos.pos_sec)
+        {
+            printf("insert to %d sec\n",m_firstLevelCount + timePos.pos_sec);
+            m_eventSlotList[m_firstLevelCount + timePos.pos_sec].push_back(event);
+        }
+        //if minute and second is equal, mean this event will not be trigger in integral point, set it to ms slot
+        else if (timePos.pos_ms != m_timePos.pos_ms)
+        {
+            printf("insert to %d ms\n", timePos.pos_ms);
+            m_eventSlotList[timePos.pos_ms].push_back(event);
+        }
+     }
+    return;
+}

+ 54 - 0
common/Timer/tw.h

@@ -0,0 +1,54 @@
+
+
+#include <memory>
+#include <list>
+#include <vector>
+#include <mutex>
+
+typedef struct TimePos{
+    int pos_ms;
+    int pos_sec;
+    int pos_min;
+}TimePos_t;
+
+typedef struct Event {
+    int id;
+    void(*cb)(void);
+    void* arg;
+    TimePos_t timePos;
+    int interval;
+}Event_t;
+
+
+class TimeWheel {
+    typedef std::shared_ptr<TimeWheel> TimeWheelPtr;
+    typedef void (*EventCallback_t)(void );
+    typedef std::vector<std::list<Event_t>> EventSlotList_t;
+public:
+    TimeWheel();
+    
+    void initTimeWheel(int steps, int maxMin);
+    void createTimingEvent(int interval, EventCallback_t callback);
+
+public:
+    static void* loopForInterval(void* arg);
+    
+private:
+    int getCurrentMs(TimePos_t timePos);
+    int createEventId();
+    int processEvent(std::list<Event_t> &eventList);
+    void getTriggerTimeFromInterval(int interval, TimePos_t &timePos);
+    void insertEventToSlot(int interval, Event_t& event);
+
+    EventSlotList_t m_eventSlotList;
+    TimePos_t m_timePos;
+    pthread_t m_loopThread;
+
+    int m_firstLevelCount;
+    int m_secondLevelCount;
+    int m_thirdLevelCount; 
+    
+    int m_steps;
+    int m_increaseId;  // not used
+    std::mutex m_mutex;
+};

+ 0 - 103
common/TimerWheel/TimerWheel.h

@@ -1,103 +0,0 @@
-
-#include <chrono>
-#include <functional>
-#include <list>
-#include <mutex>
-#include <thread>
-#include <vector>
-#include <iostream>
-
-/**
- * @brief 这是一个时间轮定时器的实现,将任务添加到时间轮中,时间轮会在指定的时间点执行任务
- *        任务是一个函数对象,可以是lambda表达式,也可以是std::bind绑定的函数
- *        任务是挂在一个个时间点上的,同一个时间点可以有多个任务,时间点的最大数目就是任务容器最大容量
- * 
- * 缺点:
- *        1、时间轮的时间点是固定的,如果任务的时间点不在时间轮的时间点上,就会被延迟执行
- *        2、任务执行是在这个线程中执行的,如果任务执行时间过长,会影响时间轮的执行
- */
-
-class TimerWheel {
-public:
-    using Task = std::function<void()>;
-    /* 构造函数,第一个参数是任务时间点容器最大数量,第二个参数是最低检测时间单位 */
-    explicit TimerWheel(size_t wheel_size, int interval_ms)
-        : wheel_size_(wheel_size),
-        interval_ms_(interval_ms),
-        wheel_(wheel_size),
-        current_index_(0) {}
-
-    ~TimerWheel() {
-        Stop();
-    }
-    /* 开启时间轮 */
-    void Start() {
-        if (running_) {
-            return;
-        }
-        running_ = true;
-        /* 开启新线程,定时检测队列 */
-        thread_ = std::thread([this]() {
-            while (running_) {
-                std::this_thread::sleep_for(std::chrono::milliseconds(interval_ms_));
-                Tick();
-            }
-        std::cout << "timer oooops!" << std::endl;
-            });
-        thread_.detach();
-    }
-
-    void Stop() {
-        if (!running_) {
-            return;
-        }
-        running_ = false;
-        if (thread_.joinable()) {
-            thread_.join();
-        }
-    }
-    /* 添加任务函数 */
-    void AddTask(int timeout_ms, Task task) {
-        std::lock_guard<std::mutex> lock(mutex_);
-        /* 计算出需要轮训的次数 */
-        size_t ticks = timeout_ms / interval_ms_;
-        /* 在当前时间点上加上ticks */
-        size_t index = (current_index_ + ticks) % wheel_size_;
-        /* 这里是设置在整个时间轮中,该任务在这个时间轮中的分布,以实现循环定时 */
-        size_t allindex = index;
-        for (size_t i = 1 ; allindex < wheel_size_; i++)
-        {
-            allindex = index * i;
-            if (allindex >= wheel_size_)
-                break;
-            wheel_[allindex].push_back(task);
-        }
-        
-    }
-
-private:
-    /* 时间片 */
-    void Tick() {
-        std::lock_guard<std::mutex> lock(mutex_);
-        /* 取出这个时间点的函数,循环执行 */
-        auto& tasks = wheel_[current_index_];
-        for (const auto& task : tasks) {
-            task();
-        }
-        //tasks.clear();
-        /* 可以循环定时的关键,超过了设置的最大时间点,就会重置 */
-        current_index_ = (current_index_ + 1) % wheel_size_;
-    }
-
-private:
-    size_t wheel_size_;                     /* 时间轮最大的轮训次数 */
-    int interval_ms_;                       /* 每个时间点的间隔秒数 */
-    std::vector<std::list<Task>> wheel_;    /* 任务队列,同一个时间点可以有多个链表 */
-    size_t current_index_;                  /* 现在的时间点 */
-    bool running_ = false;
-    std::thread thread_;
-    std::mutex mutex_;
-};
-
-
-