| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 | 
							- // Copyright Takatoshi Kondo 2020
 
- //
 
- // Distributed under the Boost Software License, Version 1.0.
 
- // (See accompanying file LICENSE_1_0.txt or copy at
 
- // http://www.boost.org/LICENSE_1_0.txt)
 
- #if !defined(MQTT_BROKER_OFFLINE_MESSAGE_HPP)
 
- #define MQTT_BROKER_OFFLINE_MESSAGE_HPP
 
- #include <mqtt/config.hpp>
 
- #include <boost/asio/steady_timer.hpp>
 
- #include <boost/multi_index_container.hpp>
 
- #include <boost/multi_index/ordered_index.hpp>
 
- #include <boost/multi_index/sequenced_index.hpp>
 
- #include <boost/multi_index/member.hpp>
 
- #include <mqtt/buffer.hpp>
 
- #include <mqtt/property_variant.hpp>
 
- #include <mqtt/publish.hpp>
 
- #include <mqtt/broker/broker_namespace.hpp>
 
- #include <mqtt/broker/common_type.hpp>
 
- #include <mqtt/broker/tags.hpp>
 
- #include <mqtt/broker/property_util.hpp>
 
- MQTT_BROKER_NS_BEGIN
 
- namespace mi = boost::multi_index;
 
- class offline_messages;
 
- // The offline_message structure holds messages that have been published on a
 
- // topic that a not-currently-connected client is subscribed to.
 
- // When a new connection is made with the client id for this saved data,
 
- // these messages will be published to that client, and only that client.
 
- class offline_message {
 
- public:
 
-     offline_message(
 
-         buffer topic,
 
-         buffer contents,
 
-         publish_options pubopts,
 
-         v5::properties props,
 
-         std::shared_ptr<as::steady_timer> tim_message_expiry)
 
-         : topic_(force_move(topic)),
 
-           contents_(force_move(contents)),
 
-           pubopts_(pubopts),
 
-           props_(force_move(props)),
 
-           tim_message_expiry_(force_move(tim_message_expiry))
 
-     { }
 
-     bool send(endpoint_t& ep) {
 
-         auto props = props_;
 
-         if (tim_message_expiry_) {
 
-             auto d =
 
-                 std::chrono::duration_cast<std::chrono::seconds>(
 
-                     tim_message_expiry_->expiry() - std::chrono::steady_clock::now()
 
-                 ).count();
 
-             if (d < 0) d = 0;
 
-             set_property<v5::property::message_expiry_interval>(
 
-                 props,
 
-                 v5::property::message_expiry_interval(
 
-                     static_cast<uint32_t>(d)
 
-                 )
 
-             );
 
-         }
 
-         auto qos_value = pubopts_.get_qos();
 
-         if (qos_value == qos::at_least_once ||
 
-             qos_value == qos::exactly_once) {
 
-             if (auto pid = ep.acquire_unique_packet_id_no_except()) {
 
-                 ep.async_publish(
 
-                     pid.value(),
 
-                     force_move(topic_),
 
-                     force_move(contents_),
 
-                     pubopts_,
 
-                     force_move(props),
 
-                     any{},
 
-                     [sp = ep.shared_from_this()]
 
-                     (error_code ec) {
 
-                         if (ec) {
 
-                             MQTT_LOG("mqtt_broker", warning)
 
-                                 << MQTT_ADD_VALUE(address, sp.get())
 
-                                 << ec.message();
 
-                         }
 
-                     }
 
-                 );
 
-                 return true;
 
-             }
 
-         }
 
-         else {
 
-             ep.publish(
 
-                 topic_,
 
-                 contents_,
 
-                 pubopts_,
 
-                 force_move(props)
 
-             );
 
-             return true;
 
-         }
 
-         return false;
 
-     }
 
- private:
 
-     friend class offline_messages;
 
-     buffer topic_;
 
-     buffer contents_;
 
-     publish_options pubopts_;
 
-     v5::properties props_;
 
-     std::shared_ptr<as::steady_timer> tim_message_expiry_;
 
- };
 
- class offline_messages {
 
- public:
 
-     void send_until_fail(endpoint_t& ep) {
 
-         auto& idx = messages_.get<tag_seq>();
 
-         while (!idx.empty()) {
 
-             auto it = idx.begin();
 
-             // const_cast is appropriate here
 
-             // See https://github.com/boostorg/multi_index/issues/50
 
-             auto& m = const_cast<offline_message&>(*it);
 
-             if (m.send(ep)) {
 
-                 idx.pop_front();
 
-             }
 
-             else {
 
-                 break;
 
-             }
 
-         }
 
-     }
 
-     void clear() {
 
-         messages_.clear();
 
-     }
 
-     bool empty() const {
 
-         return messages_.empty();
 
-     }
 
-     void push_back(
 
-         as::io_context& timer_ioc,
 
-         buffer pub_topic,
 
-         buffer contents,
 
-         publish_options pubopts,
 
-         v5::properties props) {
 
-         optional<std::chrono::steady_clock::duration> message_expiry_interval;
 
-         auto v = get_property<v5::property::message_expiry_interval>(props);
 
-         if (v) {
 
-             message_expiry_interval.emplace(std::chrono::seconds(v.value().val()));
 
-         }
 
-         std::shared_ptr<as::steady_timer> tim_message_expiry;
 
-         if (message_expiry_interval) {
 
-             tim_message_expiry = std::make_shared<as::steady_timer>(timer_ioc, message_expiry_interval.value());
 
-             tim_message_expiry->async_wait(
 
-                 [this, wp = std::weak_ptr<as::steady_timer>(tim_message_expiry)](error_code ec) mutable {
 
-                     if (auto sp = wp.lock()) {
 
-                         if (!ec) {
 
-                             messages_.get<tag_tim>().erase(sp);
 
-                         }
 
-                     }
 
-                 }
 
-             );
 
-         }
 
-         auto& seq_idx = messages_.get<tag_seq>();
 
-         seq_idx.emplace_back(
 
-             force_move(pub_topic),
 
-             force_move(contents),
 
-             pubopts,
 
-             force_move(props),
 
-             force_move(tim_message_expiry)
 
-         );
 
-     }
 
- private:
 
-     using mi_offline_message = mi::multi_index_container<
 
-         offline_message,
 
-         mi::indexed_by<
 
-             mi::sequenced<
 
-                 mi::tag<tag_seq>
 
-             >,
 
-             mi::ordered_non_unique<
 
-                 mi::tag<tag_tim>,
 
-                 BOOST_MULTI_INDEX_MEMBER(offline_message, std::shared_ptr<as::steady_timer>, tim_message_expiry_)
 
-             >
 
-         >
 
-     >;
 
-     mi_offline_message messages_;
 
- };
 
- MQTT_BROKER_NS_END
 
- #endif // MQTT_BROKER_OFFLINE_MESSAGE_HPP
 
 
  |