| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 | // Copyright Takatoshi Kondo 2020//// Distributed under the Boost Software License, Version 1.0.// (See accompanying file LICENSE_1_0.txt or copy at// http://www.boost.org/LICENSE_1_0.txt)#if !defined(MQTT_BROKER_INFLIGHT_MESSAGE_HPP)#define MQTT_BROKER_INFLIGHT_MESSAGE_HPP#include <mqtt/config.hpp>#include <chrono>#include <boost/asio/steady_timer.hpp>#include <mqtt/broker/broker_namespace.hpp>#include <mqtt/message_variant.hpp>#include <mqtt/any.hpp>#include <mqtt/visitor_util.hpp>#include <mqtt/broker/common_type.hpp>#include <mqtt/broker/tags.hpp>#include <mqtt/broker/property_util.hpp>MQTT_BROKER_NS_BEGINclass inflight_messages;class inflight_message {public:    inflight_message(        store_message_variant msg,        any life_keeper,        std::shared_ptr<as::steady_timer> tim_message_expiry)        :msg_ { force_move(msg) },         life_keeper_ { force_move(life_keeper) },         tim_message_expiry_ { force_move(tim_message_expiry) }    {}    packet_id_t packet_id() const {        return            MQTT_NS::visit(                make_lambda_visitor(                    [](auto const& m) {                        return m.packet_id();                    }                ),                msg_            );    }    void send(endpoint_t& ep) const {        optional<store_message_variant> msg_opt;        if (tim_message_expiry_) {            MQTT_NS::visit(                make_lambda_visitor(                    [&](v5::basic_publish_message<sizeof(packet_id_t)> const& m) {                        auto updated_msg = m;                        auto d =                            std::chrono::duration_cast<std::chrono::seconds>(                                tim_message_expiry_->expiry() - std::chrono::steady_clock::now()                            ).count();                        if (d < 0) d = 0;                        updated_msg.update_prop(                            v5::property::message_expiry_interval(                                static_cast<uint32_t>(d)                            )                        );                        msg_opt.emplace(force_move(updated_msg));                    },                    [](auto const&) {                    }                ),                msg_            );        }        // packet_id_exhausted never happen because inflight message has already        // allocated packet_id at the previous connection.        // In async_send_store_message(), packet_id is registered.        ep.async_send_store_message(            msg_opt ? msg_opt.value() : msg_,            life_keeper_,            [sp = ep.shared_from_this()](error_code ec) {                if (ec) {                    MQTT_LOG("mqtt_broker", trace)                        << MQTT_ADD_VALUE(address, sp.get())                        <<  ec;                }            }        );    }private:    friend class inflight_messages;    store_message_variant msg_;    any life_keeper_;    std::shared_ptr<as::steady_timer> tim_message_expiry_;};class inflight_messages {public:    void insert(        store_message_variant msg,        any life_keeper,        std::shared_ptr<as::steady_timer> tim_message_expiry    ) {        messages_.emplace_back(            force_move(msg),            force_move(life_keeper),            force_move(tim_message_expiry)        );    }    void send_all_messages(endpoint_t& ep) {        for (auto const& ifm : messages_) {            ifm.send(ep);        }    }    void clear() {        messages_.clear();    }    template <typename Tag>    decltype(auto) get() {        return messages_.get<Tag>();    }    template <typename Tag>    decltype(auto) get() const {        return messages_.get<Tag>();    }private:    using mi_inflight_message = mi::multi_index_container<        inflight_message,        mi::indexed_by<            mi::sequenced<                mi::tag<tag_seq>            >,            mi::ordered_unique<                mi::tag<tag_pid>,                BOOST_MULTI_INDEX_CONST_MEM_FUN(inflight_message, packet_id_t, packet_id)            >,            mi::ordered_non_unique<                mi::tag<tag_tim>,                BOOST_MULTI_INDEX_MEMBER(inflight_message, std::shared_ptr<as::steady_timer>, tim_message_expiry_)            >        >    >;    mi_inflight_message messages_;};MQTT_BROKER_NS_END#endif // MQTT_BROKER_INFLIGHT_MESSAGE_HPP
 |