123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- // Copyright Takatoshi Kondo 2022
- //
- // Distributed under the Boost Software License, Version 1.0.
- // (See accompanying file LICENSE_1_0.txt or copy at
- // http://www.boost.org/LICENSE_1_0.txt)
- #if !defined(MQTT_STORE_HPP)
- #define MQTT_STORE_HPP
- #include <mqtt/config.hpp> // should be top to configure variant limit
- #include <boost/multi_index_container.hpp>
- #include <boost/multi_index/ordered_index.hpp>
- #include <boost/multi_index/mem_fun.hpp>
- #include <boost/multi_index/sequenced_index.hpp>
- #include <boost/multi_index/composite_key.hpp>
- #include <mqtt/any.hpp>
- #include <mqtt/message_variant.hpp>
- #include <mqtt/packet_id_type.hpp>
- namespace MQTT_NS {
- namespace mi = boost::multi_index;
- enum class store_insert_update_result {
- inserted,
- updated
- };
- template <std::size_t PacketIdBytes>
- class store {
- private:
- struct tag_packet_id {};
- struct tag_packet_id_type {};
- struct tag_seq {};
- public:
- using packet_id_t = typename packet_id_type<PacketIdBytes>::type;
- bool insert(
- packet_id_t packet_id,
- control_packet_type expected_type,
- basic_store_message_variant<PacketIdBytes> smv,
- any life_keeper
- ) {
- auto ret = elems_.emplace(
- packet_id,
- expected_type,
- force_move(smv),
- force_move(life_keeper)
- );
- return ret.second;
- }
- store_insert_update_result insert_or_update(
- packet_id_t packet_id,
- control_packet_type expected_type,
- basic_store_message_variant<PacketIdBytes> smv,
- any life_keeper
- ) {
- auto ret = elems_.emplace(
- packet_id,
- expected_type,
- force_move(smv),
- force_move(life_keeper)
- );
- if (ret.second) return store_insert_update_result::inserted;
- // When client want to restore serialized messages,
- // endpoint might keep the message that has the same packet_id.
- // In this case, overwrite the element.
- // entry exists
- elems_.modify(
- ret.first,
- [&] (auto& e) {
- e.packet_id_ = packet_id;
- e.expected_control_packet_type_ = expected_type;
- e.smv_ = force_move(smv);
- e.life_keeper_ = force_move(life_keeper);
- }
- );
- return store_insert_update_result::updated;
- }
- void for_each(
- std::function<
- // if return true, then erase element
- bool(basic_store_message_variant<PacketIdBytes> const&, any const&)
- > const& f
- ) {
- auto& idx = elems_.template get<tag_seq>();
- auto it = idx.begin();
- auto end = idx.end();
- while (it != end) {
- if (f(it->message(), it->life_keeper())) {
- it = idx.erase(it);
- }
- else {
- ++it;
- }
- }
- }
- std::size_t erase(packet_id_t packet_id) {
- auto& idx = elems_.template get<tag_packet_id>();
- return idx.erase(packet_id);
- }
- bool erase(packet_id_t packet_id, control_packet_type type) {
- auto& idx = elems_.template get<tag_packet_id_type>();
- auto ret = idx.equal_range(std::make_tuple(packet_id, type));
- if (ret.first == ret.second) return false;
- idx.erase(ret.first, ret.second);
- return true;
- }
- void clear() {
- elems_.clear();
- }
- bool empty() const {
- return elems_.empty();
- }
- private:
- struct elem_t {
- friend class store;
- elem_t(
- packet_id_t id,
- control_packet_type type,
- basic_store_message_variant<PacketIdBytes> smv,
- any life_keeper = any())
- : packet_id_(id)
- , expected_control_packet_type_(type)
- , smv_(force_move(smv))
- , life_keeper_(force_move(life_keeper)) {}
- packet_id_t packet_id() const { return packet_id_; }
- control_packet_type expected_control_packet_type() const { return expected_control_packet_type_; }
- basic_store_message_variant<PacketIdBytes> const& message() const {
- return smv_;
- }
- basic_store_message_variant<PacketIdBytes>& message() {
- return smv_;
- }
- any const& life_keeper() const {
- return life_keeper_;
- }
- bool is_publish() const {
- return
- expected_control_packet_type_ == control_packet_type::puback ||
- expected_control_packet_type_ == control_packet_type::pubrec;
- }
- private:
- packet_id_t packet_id_;
- control_packet_type expected_control_packet_type_;
- basic_store_message_variant<PacketIdBytes> smv_;
- any life_keeper_;
- };
- using mi_elem = mi::multi_index_container<
- elem_t,
- mi::indexed_by<
- mi::ordered_unique<
- mi::tag<tag_packet_id_type>,
- mi::composite_key<
- elem_t,
- mi::const_mem_fun<
- elem_t, packet_id_t,
- &elem_t::packet_id
- >,
- mi::const_mem_fun<
- elem_t, control_packet_type,
- &elem_t::expected_control_packet_type
- >
- >
- >,
- mi::ordered_non_unique<
- mi::tag<tag_packet_id>,
- mi::const_mem_fun<
- elem_t, packet_id_t,
- &elem_t::packet_id
- >
- >,
- mi::sequenced<
- mi::tag<tag_seq>
- >
- >
- >;
- mi_elem elems_;
- };
- } // namespace MQTT_NS
- #endif // MQTT_STORE_HPP
|