tw.cpp 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. #include "tw.h"
  2. #include <iostream>
  3. #include <memory.h>
  4. #include <chrono>
  5. #include <thread>
  6. TimeWheel::TimeWheel() : m_steps(0), m_firstLevelCount(0), m_secondLevelCount(60), m_thirdLevelCount(0),
  7. m_increaseId (0){
  8. memset(&m_timePos, 0, sizeof(m_timePos));
  9. }
  10. void* TimeWheel::loopForInterval(void* arg)
  11. {
  12. if(arg == NULL) {
  13. printf("valid parameter\n");
  14. return NULL;
  15. }
  16. TimeWheel* timeWheel = reinterpret_cast<TimeWheel*>(arg);
  17. while(1) {
  18. std::this_thread::sleep_for(std::chrono::milliseconds(timeWheel->m_steps));
  19. // printf("wake up\n");
  20. TimePos pos = {0};
  21. TimePos m_lastTimePos = timeWheel->m_timePos;
  22. //update slot of current TimeWheel
  23. timeWheel->getTriggerTimeFromInterval(timeWheel->m_steps, pos);
  24. timeWheel->m_timePos = pos;
  25. {
  26. std::unique_lock<std::mutex> lock(timeWheel->m_mutex);
  27. // if minute changed, process in integral point (minute)
  28. if (pos.pos_min != m_lastTimePos.pos_min)
  29. {
  30. // printf("minutes changed\n");
  31. std::list<Event_t>* eventList = &timeWheel->m_eventSlotList[timeWheel->m_timePos.pos_min + timeWheel->m_firstLevelCount + timeWheel->m_secondLevelCount];
  32. timeWheel->processEvent(*eventList);
  33. eventList->clear();
  34. }
  35. else if (pos.pos_sec != m_lastTimePos.pos_sec)
  36. {
  37. //in same minute, but second changed, now is in this integral second
  38. // printf("second changed\n");
  39. std::list<Event_t>* eventList = &timeWheel->m_eventSlotList[timeWheel->m_timePos.pos_sec + timeWheel->m_firstLevelCount];
  40. timeWheel->processEvent(*eventList);
  41. eventList->clear();
  42. }
  43. else if (pos.pos_ms != m_lastTimePos.pos_ms)
  44. {
  45. //now in this ms
  46. // printf("ms changed\n");
  47. std::list<Event_t>* eventList = &timeWheel->m_eventSlotList[timeWheel->m_timePos.pos_ms];
  48. timeWheel->processEvent(*eventList);
  49. eventList->clear();
  50. }
  51. // printf("loop over\n");
  52. }
  53. }
  54. return nullptr;
  55. }
  56. //init TimeWheel's step and maxmin, which detemine the max period of this wheel
  57. void TimeWheel::initTimeWheel(int steps, int maxMin)
  58. {
  59. if (1000 % steps != 0){
  60. printf("invalid steps\n");
  61. return;
  62. }
  63. m_steps = steps;
  64. m_firstLevelCount = 1000/steps;
  65. m_thirdLevelCount = maxMin;
  66. m_eventSlotList.resize(m_firstLevelCount + m_secondLevelCount + m_thirdLevelCount);
  67. int ret = pthread_create(&m_loopThread, NULL, loopForInterval, this);
  68. if(ret != 0) {
  69. printf("create thread error:%s\n", strerror(errno));
  70. return;
  71. }
  72. // pthread_join(m_loopThread, NULL);
  73. }
  74. void TimeWheel::createTimingEvent(int interval, EventCallback_t callback){
  75. if(interval < m_steps || interval % m_steps != 0 || interval >= m_steps*m_firstLevelCount*m_secondLevelCount*m_thirdLevelCount){
  76. printf("invalid interval\n");
  77. return;
  78. }
  79. printf("start create event\n");
  80. Event_t event = {0};
  81. event.interval = interval;
  82. event.cb = callback;
  83. //set time start
  84. event.timePos.pos_min = m_timePos.pos_min;
  85. event.timePos.pos_sec = m_timePos.pos_sec;
  86. event.timePos.pos_ms = m_timePos.pos_ms;
  87. event.id = createEventId();
  88. // insert it to a slot of TimeWheel
  89. std::unique_lock<std::mutex> lock(m_mutex);
  90. insertEventToSlot(interval, event);
  91. printf("create over\n");
  92. }
  93. int TimeWheel::createEventId() {
  94. return m_increaseId++;
  95. }
  96. void TimeWheel::getTriggerTimeFromInterval(int interval, TimePos_t &timePos) {
  97. //get current time: ms
  98. int curTime = getCurrentMs(m_timePos);
  99. // printf("interval = %d,current ms = %d\n", interval, curTime);
  100. //caculate which slot this interval should belong to
  101. int futureTime = curTime + interval;
  102. // printf("future ms = %d\n", futureTime);
  103. timePos.pos_min = (futureTime/1000/60)%m_thirdLevelCount;
  104. timePos.pos_sec = (futureTime%(1000*60))/1000;
  105. timePos.pos_ms = (futureTime%1000)/m_steps;
  106. // printf("next minPos=%d, secPos=%d, msPos=%d\n", timePos.pos_min, timePos.pos_sec, timePos.pos_ms);
  107. return;
  108. }
  109. int TimeWheel::getCurrentMs(TimePos_t timePos) {
  110. return m_steps * timePos.pos_ms + timePos.pos_sec*1000 + timePos.pos_min*60*1000;
  111. }
  112. int TimeWheel::processEvent(std::list<Event_t> &eventList){
  113. // printf("eventList.size=%d\n", eventList.size());
  114. //process the event for current slot
  115. for(auto event = eventList.begin(); event != eventList.end(); event ++) {
  116. //caculate the current ms
  117. int currentMs = getCurrentMs(m_timePos);
  118. //caculate last time(ms) this event was processed
  119. int lastProcessedMs = getCurrentMs(event->timePos);
  120. //caculate the distance between now and last time(ms)
  121. int distanceMs = (currentMs - lastProcessedMs + (m_secondLevelCount+1)*60*1000)%((m_secondLevelCount+1)*60*1000);
  122. //if interval == distanceMs, need process this event
  123. if (event->interval == distanceMs)
  124. {
  125. //process event
  126. event->cb();
  127. //get now pos as this event's start point
  128. event->timePos = m_timePos;
  129. //add this event to slot
  130. insertEventToSlot(event->interval, *event);
  131. }
  132. else
  133. {
  134. //this condition will be trigger when process the integral point
  135. printf("event->interval != distanceMs\n");
  136. // although this event in this positon, but it not arriving timing, it will continue move to next slot caculate by distance ms.
  137. insertEventToSlot(distanceMs, *event);
  138. }
  139. }
  140. return 0;
  141. }
  142. void TimeWheel::insertEventToSlot(int interval, Event_t& event)
  143. {
  144. printf("insertEventToSlot\n");
  145. TimePos_t timePos = {0};
  146. //caculate the which slot this event should be set to
  147. getTriggerTimeFromInterval(interval, timePos);
  148. {
  149. // printf("timePos.pos_min=%d, m_timePos.pos_min=%d\n", timePos.pos_min, m_timePos.pos_min);
  150. // printf("timePos.pos_sec=%d, m_timePos.pos_sec=%d\n", timePos.pos_sec, m_timePos.pos_sec);
  151. // printf("timePos.pos_ms=%d, m_timePos.pos_ms=%d\n", timePos.pos_ms, m_timePos.pos_ms);
  152. // if minutes not equal to current minute, first insert it to it's minute slot
  153. if (timePos.pos_min != m_timePos.pos_min)
  154. {
  155. printf("insert to %d minute\n", m_firstLevelCount + m_secondLevelCount + timePos.pos_min);
  156. m_eventSlotList[m_firstLevelCount + m_secondLevelCount + timePos.pos_min]
  157. .push_back(event);
  158. }
  159. // if minutes is equal, but second changed, insert slot to this integral point second
  160. else if (timePos.pos_sec != m_timePos.pos_sec)
  161. {
  162. printf("insert to %d sec\n",m_firstLevelCount + timePos.pos_sec);
  163. m_eventSlotList[m_firstLevelCount + timePos.pos_sec].push_back(event);
  164. }
  165. //if minute and second is equal, mean this event will not be trigger in integral point, set it to ms slot
  166. else if (timePos.pos_ms != m_timePos.pos_ms)
  167. {
  168. printf("insert to %d ms\n", timePos.pos_ms);
  169. m_eventSlotList[timePos.pos_ms].push_back(event);
  170. }
  171. }
  172. return;
  173. }