store.hpp 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. // Copyright Takatoshi Kondo 2022
  2. //
  3. // Distributed under the Boost Software License, Version 1.0.
  4. // (See accompanying file LICENSE_1_0.txt or copy at
  5. // http://www.boost.org/LICENSE_1_0.txt)
  6. #if !defined(MQTT_STORE_HPP)
  7. #define MQTT_STORE_HPP
  8. #include <mqtt/config.hpp> // should be top to configure variant limit
  9. #include <boost/multi_index_container.hpp>
  10. #include <boost/multi_index/ordered_index.hpp>
  11. #include <boost/multi_index/mem_fun.hpp>
  12. #include <boost/multi_index/sequenced_index.hpp>
  13. #include <boost/multi_index/composite_key.hpp>
  14. #include <mqtt/any.hpp>
  15. #include <mqtt/message_variant.hpp>
  16. #include <mqtt/packet_id_type.hpp>
  17. namespace MQTT_NS {
  18. namespace mi = boost::multi_index;
  19. enum class store_insert_update_result {
  20. inserted,
  21. updated
  22. };
  23. template <std::size_t PacketIdBytes>
  24. class store {
  25. private:
  26. struct tag_packet_id {};
  27. struct tag_packet_id_type {};
  28. struct tag_seq {};
  29. public:
  30. using packet_id_t = typename packet_id_type<PacketIdBytes>::type;
  31. bool insert(
  32. packet_id_t packet_id,
  33. control_packet_type expected_type,
  34. basic_store_message_variant<PacketIdBytes> smv,
  35. any life_keeper
  36. ) {
  37. auto ret = elems_.emplace(
  38. packet_id,
  39. expected_type,
  40. force_move(smv),
  41. force_move(life_keeper)
  42. );
  43. return ret.second;
  44. }
  45. store_insert_update_result insert_or_update(
  46. packet_id_t packet_id,
  47. control_packet_type expected_type,
  48. basic_store_message_variant<PacketIdBytes> smv,
  49. any life_keeper
  50. ) {
  51. auto ret = elems_.emplace(
  52. packet_id,
  53. expected_type,
  54. force_move(smv),
  55. force_move(life_keeper)
  56. );
  57. if (ret.second) return store_insert_update_result::inserted;
  58. // When client want to restore serialized messages,
  59. // endpoint might keep the message that has the same packet_id.
  60. // In this case, overwrite the element.
  61. // entry exists
  62. elems_.modify(
  63. ret.first,
  64. [&] (auto& e) {
  65. e.packet_id_ = packet_id;
  66. e.expected_control_packet_type_ = expected_type;
  67. e.smv_ = force_move(smv);
  68. e.life_keeper_ = force_move(life_keeper);
  69. }
  70. );
  71. return store_insert_update_result::updated;
  72. }
  73. void for_each(
  74. std::function<
  75. // if return true, then erase element
  76. bool(basic_store_message_variant<PacketIdBytes> const&, any const&)
  77. > const& f
  78. ) {
  79. auto& idx = elems_.template get<tag_seq>();
  80. auto it = idx.begin();
  81. auto end = idx.end();
  82. while (it != end) {
  83. if (f(it->message(), it->life_keeper())) {
  84. it = idx.erase(it);
  85. }
  86. else {
  87. ++it;
  88. }
  89. }
  90. }
  91. std::size_t erase(packet_id_t packet_id) {
  92. auto& idx = elems_.template get<tag_packet_id>();
  93. return idx.erase(packet_id);
  94. }
  95. bool erase(packet_id_t packet_id, control_packet_type type) {
  96. auto& idx = elems_.template get<tag_packet_id_type>();
  97. auto ret = idx.equal_range(std::make_tuple(packet_id, type));
  98. if (ret.first == ret.second) return false;
  99. idx.erase(ret.first, ret.second);
  100. return true;
  101. }
  102. void clear() {
  103. elems_.clear();
  104. }
  105. bool empty() const {
  106. return elems_.empty();
  107. }
  108. private:
  109. struct elem_t {
  110. friend class store;
  111. elem_t(
  112. packet_id_t id,
  113. control_packet_type type,
  114. basic_store_message_variant<PacketIdBytes> smv,
  115. any life_keeper = any())
  116. : packet_id_(id)
  117. , expected_control_packet_type_(type)
  118. , smv_(force_move(smv))
  119. , life_keeper_(force_move(life_keeper)) {}
  120. packet_id_t packet_id() const { return packet_id_; }
  121. control_packet_type expected_control_packet_type() const { return expected_control_packet_type_; }
  122. basic_store_message_variant<PacketIdBytes> const& message() const {
  123. return smv_;
  124. }
  125. basic_store_message_variant<PacketIdBytes>& message() {
  126. return smv_;
  127. }
  128. any const& life_keeper() const {
  129. return life_keeper_;
  130. }
  131. bool is_publish() const {
  132. return
  133. expected_control_packet_type_ == control_packet_type::puback ||
  134. expected_control_packet_type_ == control_packet_type::pubrec;
  135. }
  136. private:
  137. packet_id_t packet_id_;
  138. control_packet_type expected_control_packet_type_;
  139. basic_store_message_variant<PacketIdBytes> smv_;
  140. any life_keeper_;
  141. };
  142. using mi_elem = mi::multi_index_container<
  143. elem_t,
  144. mi::indexed_by<
  145. mi::ordered_unique<
  146. mi::tag<tag_packet_id_type>,
  147. mi::composite_key<
  148. elem_t,
  149. mi::const_mem_fun<
  150. elem_t, packet_id_t,
  151. &elem_t::packet_id
  152. >,
  153. mi::const_mem_fun<
  154. elem_t, control_packet_type,
  155. &elem_t::expected_control_packet_type
  156. >
  157. >
  158. >,
  159. mi::ordered_non_unique<
  160. mi::tag<tag_packet_id>,
  161. mi::const_mem_fun<
  162. elem_t, packet_id_t,
  163. &elem_t::packet_id
  164. >
  165. >,
  166. mi::sequenced<
  167. mi::tag<tag_seq>
  168. >
  169. >
  170. >;
  171. mi_elem elems_;
  172. };
  173. } // namespace MQTT_NS
  174. #endif // MQTT_STORE_HPP