| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 | // Copyright Takatoshi Kondo 2020//// Distributed under the Boost Software License, Version 1.0.// (See accompanying file LICENSE_1_0.txt or copy at// http://www.boost.org/LICENSE_1_0.txt)#if !defined(MQTT_BROKER_OFFLINE_MESSAGE_HPP)#define MQTT_BROKER_OFFLINE_MESSAGE_HPP#include <mqtt/config.hpp>#include <boost/asio/steady_timer.hpp>#include <boost/multi_index_container.hpp>#include <boost/multi_index/ordered_index.hpp>#include <boost/multi_index/sequenced_index.hpp>#include <boost/multi_index/member.hpp>#include <mqtt/buffer.hpp>#include <mqtt/property_variant.hpp>#include <mqtt/publish.hpp>#include <mqtt/broker/broker_namespace.hpp>#include <mqtt/broker/common_type.hpp>#include <mqtt/broker/tags.hpp>#include <mqtt/broker/property_util.hpp>MQTT_BROKER_NS_BEGINnamespace mi = boost::multi_index;class offline_messages;// The offline_message structure holds messages that have been published on a// topic that a not-currently-connected client is subscribed to.// When a new connection is made with the client id for this saved data,// these messages will be published to that client, and only that client.class offline_message {public:    offline_message(        buffer topic,        buffer contents,        publish_options pubopts,        v5::properties props,        std::shared_ptr<as::steady_timer> tim_message_expiry)        : topic_(force_move(topic)),          contents_(force_move(contents)),          pubopts_(pubopts),          props_(force_move(props)),          tim_message_expiry_(force_move(tim_message_expiry))    { }    bool send(endpoint_t& ep) {        auto props = props_;        if (tim_message_expiry_) {            auto d =                std::chrono::duration_cast<std::chrono::seconds>(                    tim_message_expiry_->expiry() - std::chrono::steady_clock::now()                ).count();            if (d < 0) d = 0;            set_property<v5::property::message_expiry_interval>(                props,                v5::property::message_expiry_interval(                    static_cast<uint32_t>(d)                )            );        }        auto qos_value = pubopts_.get_qos();        if (qos_value == qos::at_least_once ||            qos_value == qos::exactly_once) {            if (auto pid = ep.acquire_unique_packet_id_no_except()) {                ep.async_publish(                    pid.value(),                    force_move(topic_),                    force_move(contents_),                    pubopts_,                    force_move(props),                    any{},                    [sp = ep.shared_from_this()]                    (error_code ec) {                        if (ec) {                            MQTT_LOG("mqtt_broker", warning)                                << MQTT_ADD_VALUE(address, sp.get())                                << ec.message();                        }                    }                );                return true;            }        }        else {            ep.publish(                topic_,                contents_,                pubopts_,                force_move(props)            );            return true;        }        return false;    }private:    friend class offline_messages;    buffer topic_;    buffer contents_;    publish_options pubopts_;    v5::properties props_;    std::shared_ptr<as::steady_timer> tim_message_expiry_;};class offline_messages {public:    void send_until_fail(endpoint_t& ep) {        auto& idx = messages_.get<tag_seq>();        while (!idx.empty()) {            auto it = idx.begin();            // const_cast is appropriate here            // See https://github.com/boostorg/multi_index/issues/50            auto& m = const_cast<offline_message&>(*it);            if (m.send(ep)) {                idx.pop_front();            }            else {                break;            }        }    }    void clear() {        messages_.clear();    }    bool empty() const {        return messages_.empty();    }    void push_back(        as::io_context& timer_ioc,        buffer pub_topic,        buffer contents,        publish_options pubopts,        v5::properties props) {        optional<std::chrono::steady_clock::duration> message_expiry_interval;        auto v = get_property<v5::property::message_expiry_interval>(props);        if (v) {            message_expiry_interval.emplace(std::chrono::seconds(v.value().val()));        }        std::shared_ptr<as::steady_timer> tim_message_expiry;        if (message_expiry_interval) {            tim_message_expiry = std::make_shared<as::steady_timer>(timer_ioc, message_expiry_interval.value());            tim_message_expiry->async_wait(                [this, wp = std::weak_ptr<as::steady_timer>(tim_message_expiry)](error_code ec) mutable {                    if (auto sp = wp.lock()) {                        if (!ec) {                            messages_.get<tag_tim>().erase(sp);                        }                    }                }            );        }        auto& seq_idx = messages_.get<tag_seq>();        seq_idx.emplace_back(            force_move(pub_topic),            force_move(contents),            pubopts,            force_move(props),            force_move(tim_message_expiry)        );    }private:    using mi_offline_message = mi::multi_index_container<        offline_message,        mi::indexed_by<            mi::sequenced<                mi::tag<tag_seq>            >,            mi::ordered_non_unique<                mi::tag<tag_tim>,                BOOST_MULTI_INDEX_MEMBER(offline_message, std::shared_ptr<as::steady_timer>, tim_message_expiry_)            >        >    >;    mi_offline_message messages_;};MQTT_BROKER_NS_END#endif // MQTT_BROKER_OFFLINE_MESSAGE_HPP
 |