mpmc_blocking_q.h 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. // Copyright(c) 2015-present, Gabi Melman & spdlog contributors.
  2. // Distributed under the MIT License (http://opensource.org/licenses/MIT)
  3. #pragma once
  4. // multi producer-multi consumer blocking queue.
  5. // enqueue(..) - will block until room found to put the new message.
  6. // enqueue_nowait(..) - will return immediately with false if no room left in
  7. // the queue.
  8. // dequeue_for(..) - will block until the queue is not empty or timeout have
  9. // passed.
  10. #include <spdlog/details/circular_q.h>
  11. #include <atomic>
  12. #include <condition_variable>
  13. #include <mutex>
  14. namespace spdlog {
  15. namespace details {
  16. template <typename T>
  17. class mpmc_blocking_queue {
  18. public:
  19. using item_type = T;
  20. explicit mpmc_blocking_queue(size_t max_items)
  21. : q_(max_items) {}
  22. #ifndef __MINGW32__
  23. // try to enqueue and block if no room left
  24. void enqueue(T &&item) {
  25. {
  26. std::unique_lock<std::mutex> lock(queue_mutex_);
  27. pop_cv_.wait(lock, [this] { return !this->q_.full(); });
  28. q_.push_back(std::move(item));
  29. }
  30. push_cv_.notify_one();
  31. }
  32. // enqueue immediately. overrun oldest message in the queue if no room left.
  33. void enqueue_nowait(T &&item) {
  34. {
  35. std::unique_lock<std::mutex> lock(queue_mutex_);
  36. q_.push_back(std::move(item));
  37. }
  38. push_cv_.notify_one();
  39. }
  40. void enqueue_if_have_room(T &&item) {
  41. bool pushed = false;
  42. {
  43. std::unique_lock<std::mutex> lock(queue_mutex_);
  44. if (!q_.full()) {
  45. q_.push_back(std::move(item));
  46. pushed = true;
  47. }
  48. }
  49. if (pushed) {
  50. push_cv_.notify_one();
  51. } else {
  52. ++discard_counter_;
  53. }
  54. }
  55. // dequeue with a timeout.
  56. // Return true, if succeeded dequeue item, false otherwise
  57. bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration) {
  58. {
  59. std::unique_lock<std::mutex> lock(queue_mutex_);
  60. if (!push_cv_.wait_for(lock, wait_duration, [this] { return !this->q_.empty(); })) {
  61. return false;
  62. }
  63. popped_item = std::move(q_.front());
  64. q_.pop_front();
  65. }
  66. pop_cv_.notify_one();
  67. return true;
  68. }
  69. // blocking dequeue without a timeout.
  70. void dequeue(T &popped_item) {
  71. {
  72. std::unique_lock<std::mutex> lock(queue_mutex_);
  73. push_cv_.wait(lock, [this] { return !this->q_.empty(); });
  74. popped_item = std::move(q_.front());
  75. q_.pop_front();
  76. }
  77. pop_cv_.notify_one();
  78. }
  79. #else
  80. // apparently mingw deadlocks if the mutex is released before cv.notify_one(),
  81. // so release the mutex at the very end each function.
  82. // try to enqueue and block if no room left
  83. void enqueue(T &&item) {
  84. std::unique_lock<std::mutex> lock(queue_mutex_);
  85. pop_cv_.wait(lock, [this] { return !this->q_.full(); });
  86. q_.push_back(std::move(item));
  87. push_cv_.notify_one();
  88. }
  89. // enqueue immediately. overrun oldest message in the queue if no room left.
  90. void enqueue_nowait(T &&item) {
  91. std::unique_lock<std::mutex> lock(queue_mutex_);
  92. q_.push_back(std::move(item));
  93. push_cv_.notify_one();
  94. }
  95. void enqueue_if_have_room(T &&item) {
  96. bool pushed = false;
  97. std::unique_lock<std::mutex> lock(queue_mutex_);
  98. if (!q_.full()) {
  99. q_.push_back(std::move(item));
  100. pushed = true;
  101. }
  102. if (pushed) {
  103. push_cv_.notify_one();
  104. } else {
  105. ++discard_counter_;
  106. }
  107. }
  108. // dequeue with a timeout.
  109. // Return true, if succeeded dequeue item, false otherwise
  110. bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration) {
  111. std::unique_lock<std::mutex> lock(queue_mutex_);
  112. if (!push_cv_.wait_for(lock, wait_duration, [this] { return !this->q_.empty(); })) {
  113. return false;
  114. }
  115. popped_item = std::move(q_.front());
  116. q_.pop_front();
  117. pop_cv_.notify_one();
  118. return true;
  119. }
  120. // blocking dequeue without a timeout.
  121. void dequeue(T &popped_item) {
  122. std::unique_lock<std::mutex> lock(queue_mutex_);
  123. push_cv_.wait(lock, [this] { return !this->q_.empty(); });
  124. popped_item = std::move(q_.front());
  125. q_.pop_front();
  126. pop_cv_.notify_one();
  127. }
  128. #endif
  129. size_t overrun_counter() {
  130. std::unique_lock<std::mutex> lock(queue_mutex_);
  131. return q_.overrun_counter();
  132. }
  133. size_t discard_counter() { return discard_counter_.load(std::memory_order_relaxed); }
  134. size_t size() {
  135. std::unique_lock<std::mutex> lock(queue_mutex_);
  136. return q_.size();
  137. }
  138. void reset_overrun_counter() {
  139. std::unique_lock<std::mutex> lock(queue_mutex_);
  140. q_.reset_overrun_counter();
  141. }
  142. void reset_discard_counter() { discard_counter_.store(0, std::memory_order_relaxed); }
  143. private:
  144. std::mutex queue_mutex_;
  145. std::condition_variable push_cv_;
  146. std::condition_variable pop_cv_;
  147. spdlog::details::circular_q<T> q_;
  148. std::atomic<size_t> discard_counter_{0};
  149. };
  150. } // namespace details
  151. } // namespace spdlog