inflight_message.hpp 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. // Copyright Takatoshi Kondo 2020
  2. //
  3. // Distributed under the Boost Software License, Version 1.0.
  4. // (See accompanying file LICENSE_1_0.txt or copy at
  5. // http://www.boost.org/LICENSE_1_0.txt)
  6. #if !defined(MQTT_BROKER_INFLIGHT_MESSAGE_HPP)
  7. #define MQTT_BROKER_INFLIGHT_MESSAGE_HPP
  8. #include <mqtt/config.hpp>
  9. #include <chrono>
  10. #include <boost/asio/steady_timer.hpp>
  11. #include <mqtt/broker/broker_namespace.hpp>
  12. #include <mqtt/message_variant.hpp>
  13. #include <mqtt/any.hpp>
  14. #include <mqtt/visitor_util.hpp>
  15. #include <mqtt/broker/common_type.hpp>
  16. #include <mqtt/broker/tags.hpp>
  17. #include <mqtt/broker/property_util.hpp>
  18. MQTT_BROKER_NS_BEGIN
  19. class inflight_messages;
  20. class inflight_message {
  21. public:
  22. inflight_message(
  23. store_message_variant msg,
  24. any life_keeper,
  25. std::shared_ptr<as::steady_timer> tim_message_expiry)
  26. :msg_ { force_move(msg) },
  27. life_keeper_ { force_move(life_keeper) },
  28. tim_message_expiry_ { force_move(tim_message_expiry) }
  29. {}
  30. packet_id_t packet_id() const {
  31. return
  32. MQTT_NS::visit(
  33. make_lambda_visitor(
  34. [](auto const& m) {
  35. return m.packet_id();
  36. }
  37. ),
  38. msg_
  39. );
  40. }
  41. void send(endpoint_t& ep) const {
  42. optional<store_message_variant> msg_opt;
  43. if (tim_message_expiry_) {
  44. MQTT_NS::visit(
  45. make_lambda_visitor(
  46. [&](v5::basic_publish_message<sizeof(packet_id_t)> const& m) {
  47. auto updated_msg = m;
  48. auto d =
  49. std::chrono::duration_cast<std::chrono::seconds>(
  50. tim_message_expiry_->expiry() - std::chrono::steady_clock::now()
  51. ).count();
  52. if (d < 0) d = 0;
  53. updated_msg.update_prop(
  54. v5::property::message_expiry_interval(
  55. static_cast<uint32_t>(d)
  56. )
  57. );
  58. msg_opt.emplace(force_move(updated_msg));
  59. },
  60. [](auto const&) {
  61. }
  62. ),
  63. msg_
  64. );
  65. }
  66. // packet_id_exhausted never happen because inflight message has already
  67. // allocated packet_id at the previous connection.
  68. // In async_send_store_message(), packet_id is registered.
  69. ep.async_send_store_message(
  70. msg_opt ? msg_opt.value() : msg_,
  71. life_keeper_,
  72. [sp = ep.shared_from_this()](error_code ec) {
  73. if (ec) {
  74. MQTT_LOG("mqtt_broker", trace)
  75. << MQTT_ADD_VALUE(address, sp.get())
  76. << ec;
  77. }
  78. }
  79. );
  80. }
  81. private:
  82. friend class inflight_messages;
  83. store_message_variant msg_;
  84. any life_keeper_;
  85. std::shared_ptr<as::steady_timer> tim_message_expiry_;
  86. };
  87. class inflight_messages {
  88. public:
  89. void insert(
  90. store_message_variant msg,
  91. any life_keeper,
  92. std::shared_ptr<as::steady_timer> tim_message_expiry
  93. ) {
  94. messages_.emplace_back(
  95. force_move(msg),
  96. force_move(life_keeper),
  97. force_move(tim_message_expiry)
  98. );
  99. }
  100. void send_all_messages(endpoint_t& ep) {
  101. for (auto const& ifm : messages_) {
  102. ifm.send(ep);
  103. }
  104. }
  105. void clear() {
  106. messages_.clear();
  107. }
  108. template <typename Tag>
  109. decltype(auto) get() {
  110. return messages_.get<Tag>();
  111. }
  112. template <typename Tag>
  113. decltype(auto) get() const {
  114. return messages_.get<Tag>();
  115. }
  116. private:
  117. using mi_inflight_message = mi::multi_index_container<
  118. inflight_message,
  119. mi::indexed_by<
  120. mi::sequenced<
  121. mi::tag<tag_seq>
  122. >,
  123. mi::ordered_unique<
  124. mi::tag<tag_pid>,
  125. BOOST_MULTI_INDEX_CONST_MEM_FUN(inflight_message, packet_id_t, packet_id)
  126. >,
  127. mi::ordered_non_unique<
  128. mi::tag<tag_tim>,
  129. BOOST_MULTI_INDEX_MEMBER(inflight_message, std::shared_ptr<as::steady_timer>, tim_message_expiry_)
  130. >
  131. >
  132. >;
  133. mi_inflight_message messages_;
  134. };
  135. MQTT_BROKER_NS_END
  136. #endif // MQTT_BROKER_INFLIGHT_MESSAGE_HPP