offline_message.hpp 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. // Copyright Takatoshi Kondo 2020
  2. //
  3. // Distributed under the Boost Software License, Version 1.0.
  4. // (See accompanying file LICENSE_1_0.txt or copy at
  5. // http://www.boost.org/LICENSE_1_0.txt)
  6. #if !defined(MQTT_BROKER_OFFLINE_MESSAGE_HPP)
  7. #define MQTT_BROKER_OFFLINE_MESSAGE_HPP
  8. #include <mqtt/config.hpp>
  9. #include <boost/asio/steady_timer.hpp>
  10. #include <boost/multi_index_container.hpp>
  11. #include <boost/multi_index/ordered_index.hpp>
  12. #include <boost/multi_index/sequenced_index.hpp>
  13. #include <boost/multi_index/member.hpp>
  14. #include <mqtt/buffer.hpp>
  15. #include <mqtt/property_variant.hpp>
  16. #include <mqtt/publish.hpp>
  17. #include <mqtt/broker/broker_namespace.hpp>
  18. #include <mqtt/broker/common_type.hpp>
  19. #include <mqtt/broker/tags.hpp>
  20. #include <mqtt/broker/property_util.hpp>
  21. MQTT_BROKER_NS_BEGIN
  22. namespace mi = boost::multi_index;
  23. class offline_messages;
  24. // The offline_message structure holds messages that have been published on a
  25. // topic that a not-currently-connected client is subscribed to.
  26. // When a new connection is made with the client id for this saved data,
  27. // these messages will be published to that client, and only that client.
  28. class offline_message {
  29. public:
  30. offline_message(
  31. buffer topic,
  32. buffer contents,
  33. publish_options pubopts,
  34. v5::properties props,
  35. std::shared_ptr<as::steady_timer> tim_message_expiry)
  36. : topic_(force_move(topic)),
  37. contents_(force_move(contents)),
  38. pubopts_(pubopts),
  39. props_(force_move(props)),
  40. tim_message_expiry_(force_move(tim_message_expiry))
  41. { }
  42. bool send(endpoint_t& ep) {
  43. auto props = props_;
  44. if (tim_message_expiry_) {
  45. auto d =
  46. std::chrono::duration_cast<std::chrono::seconds>(
  47. tim_message_expiry_->expiry() - std::chrono::steady_clock::now()
  48. ).count();
  49. if (d < 0) d = 0;
  50. set_property<v5::property::message_expiry_interval>(
  51. props,
  52. v5::property::message_expiry_interval(
  53. static_cast<uint32_t>(d)
  54. )
  55. );
  56. }
  57. auto qos_value = pubopts_.get_qos();
  58. if (qos_value == qos::at_least_once ||
  59. qos_value == qos::exactly_once) {
  60. if (auto pid = ep.acquire_unique_packet_id_no_except()) {
  61. ep.async_publish(
  62. pid.value(),
  63. force_move(topic_),
  64. force_move(contents_),
  65. pubopts_,
  66. force_move(props),
  67. any{},
  68. [sp = ep.shared_from_this()]
  69. (error_code ec) {
  70. if (ec) {
  71. MQTT_LOG("mqtt_broker", warning)
  72. << MQTT_ADD_VALUE(address, sp.get())
  73. << ec.message();
  74. }
  75. }
  76. );
  77. return true;
  78. }
  79. }
  80. else {
  81. ep.publish(
  82. topic_,
  83. contents_,
  84. pubopts_,
  85. force_move(props)
  86. );
  87. return true;
  88. }
  89. return false;
  90. }
  91. private:
  92. friend class offline_messages;
  93. buffer topic_;
  94. buffer contents_;
  95. publish_options pubopts_;
  96. v5::properties props_;
  97. std::shared_ptr<as::steady_timer> tim_message_expiry_;
  98. };
  99. class offline_messages {
  100. public:
  101. void send_until_fail(endpoint_t& ep) {
  102. auto& idx = messages_.get<tag_seq>();
  103. while (!idx.empty()) {
  104. auto it = idx.begin();
  105. // const_cast is appropriate here
  106. // See https://github.com/boostorg/multi_index/issues/50
  107. auto& m = const_cast<offline_message&>(*it);
  108. if (m.send(ep)) {
  109. idx.pop_front();
  110. }
  111. else {
  112. break;
  113. }
  114. }
  115. }
  116. void clear() {
  117. messages_.clear();
  118. }
  119. bool empty() const {
  120. return messages_.empty();
  121. }
  122. void push_back(
  123. as::io_context& timer_ioc,
  124. buffer pub_topic,
  125. buffer contents,
  126. publish_options pubopts,
  127. v5::properties props) {
  128. optional<std::chrono::steady_clock::duration> message_expiry_interval;
  129. auto v = get_property<v5::property::message_expiry_interval>(props);
  130. if (v) {
  131. message_expiry_interval.emplace(std::chrono::seconds(v.value().val()));
  132. }
  133. std::shared_ptr<as::steady_timer> tim_message_expiry;
  134. if (message_expiry_interval) {
  135. tim_message_expiry = std::make_shared<as::steady_timer>(timer_ioc, message_expiry_interval.value());
  136. tim_message_expiry->async_wait(
  137. [this, wp = std::weak_ptr<as::steady_timer>(tim_message_expiry)](error_code ec) mutable {
  138. if (auto sp = wp.lock()) {
  139. if (!ec) {
  140. messages_.get<tag_tim>().erase(sp);
  141. }
  142. }
  143. }
  144. );
  145. }
  146. auto& seq_idx = messages_.get<tag_seq>();
  147. seq_idx.emplace_back(
  148. force_move(pub_topic),
  149. force_move(contents),
  150. pubopts,
  151. force_move(props),
  152. force_move(tim_message_expiry)
  153. );
  154. }
  155. private:
  156. using mi_offline_message = mi::multi_index_container<
  157. offline_message,
  158. mi::indexed_by<
  159. mi::sequenced<
  160. mi::tag<tag_seq>
  161. >,
  162. mi::ordered_non_unique<
  163. mi::tag<tag_tim>,
  164. BOOST_MULTI_INDEX_MEMBER(offline_message, std::shared_ptr<as::steady_timer>, tim_message_expiry_)
  165. >
  166. >
  167. >;
  168. mi_offline_message messages_;
  169. };
  170. MQTT_BROKER_NS_END
  171. #endif // MQTT_BROKER_OFFLINE_MESSAGE_HPP