| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742 | // Copyright Wouter van Kleunen 2019//// Distributed under the Boost Software License, Version 1.0.// (See accompanying file LICENSE_1_0.txt or copy at// http://www.boost.org/LICENSE_1_0.txt)#if !defined(MQTT_BROKER_SUBSCRIPTION_MAP_HPP)#define MQTT_BROKER_SUBSCRIPTION_MAP_HPP#include <unordered_map>#include <boost/functional/hash.hpp>#include <boost/range/adaptor/reversed.hpp>#include <mqtt/broker/broker_namespace.hpp>#include <mqtt/string_view.hpp>#include <mqtt/optional.hpp>#include <mqtt/buffer.hpp>#include <mqtt/broker/topic_filter.hpp>MQTT_BROKER_NS_BEGIN/** * * In MQTT we have: *    Clients subscribed with certain topic filters, topic filters are path with may contain wildcards such as *    + and # *       . A subscription to "#" will not receive any messages published to a topic beginning with a $ *       · A subscription to "+/monitor/Clients" will not receive any messages published to "$SYS/monitor/Clients" *       · A subscription to "$SYS/#" will receive messages published to topics beginning with "$SYS/" *       · A subscription to "$SYS/monitor/Clients/+" will receive messages published to "$SYS/monitor/Clients/" *       · For a Client to receive messages from topics that begin with $SYS/ and from topics that don’t begin with a $, it has to subscribe to both “#” and “$SYS/#” *    Check whether a string is a valid subscription using 'mqtt_valid_subscription' * *    Topics being published, a topic is a sort of path and does not contain wildcards *       · $SYS/ has been widely adopted as a prefix to topics that contain Server-specific information or control APIs *       · Applications cannot use a topic with a leading $ character for their own purposes *    Check whether a string is a valid topic using 'mqtt_valid_topic' * * *    We introduce two data structures: *       . A subscription map, storing a topic_filter -> data *           Using a published topic, we can find all topic filters which match the specified topic *       . A stored topic map, storing topic -> data *           Using a new topic filter, we can find all stored topics which match the specified topic filter * *   Subscription map stores all entries in a tree *   the tree starts from a root node, and topic filters are tokenized and stored in the tree * *   For example if the topic_filter example/monitor/Clients is stored, the following nodes are created: *   root -> example -> monitor -> Clients * *   Every node in the tree may store one or multiple subscribers. Nodes store a reference count to the number of subscribers *   so for example, if we store the following topic_filters: *     example/ *     example/monitor/Clients * *   the subscription map looks as follows: *     root(2) -> example(2) -> monitor(1) -> Clients (1) * *   hash and + are stored as normal nodes within the tree, but the parent node knows if a hash child is available. This *   improves the matching, no extra lookup is required to see if a # or + child is available in a child node: * *     example/# * *    stores the following tree: *      root -> example (hash: yes) -> # * *    and * *      example/+ * *    stores the following tree: *      root -> example (plus: yes) -> # * *    all node entries are stored in a single hash map. The key for every node is: (parent node id, path) * *      so if we store: root/example/test *      root (id:1) -> example (id:2, key:1,example) -> test (id:3, key:2,test) * *    also, every node stores the key of its parent, allowing quick traversing from leaf to root of the tree */// Combined storage for count and flags// we can have 32bit or 64bit version// Compile error on other platforms (not 32 or 64 bit)template<std::size_t N>struct count_storage {   static_assert(N == 4 || N == 8, "Subscription map count_storage only knows how to handle architectures with 32 or 64 bit size_t: please update to support your platform.");};template<>struct count_storage<4> {public:    count_storage(std::uint32_t v = 1)        : value_(v & 0x3fffffffUL), has_hash_child_(false), has_plus_child_(false)    { }    static constexpr std::size_t max() { return std::numeric_limits<uint32_t>::max() >> 2; }    std::uint32_t value() const { return value_; }    void set_value(std::uint32_t v) {        value_ = v & 0x3fffffffUL;    }    void increment_value() {        ++value_;    }    void decrement_value() {        --value_;    }    bool has_hash_child() const { return has_hash_child_; }    void set_hash_child(bool v) {        has_hash_child_ = v;    }    bool has_plus_child() const { return has_plus_child_; }    void set_plus_child(bool v) {        has_plus_child_ = v;    }private:    std::uint32_t value_ : 30;    std::uint32_t has_hash_child_ : 1;    std::uint32_t has_plus_child_ : 1;};template<>struct count_storage<8> {public:    count_storage(std::uint64_t v= 1)        : value_(v & 0x3fffffffffffffffULL), has_hash_child_(false), has_plus_child_(false)    { }    static constexpr std::uint64_t max() { return std::numeric_limits<uint64_t>::max() >> 2; }    std::uint64_t value() const { return value_; }    void set_value(std::uint64_t v) {        value_ = v & 0x3fffffffffffffffULL;    }    void increment_value() {        ++value_;    }    void decrement_value() {        --value_;    }    bool has_hash_child() const { return has_hash_child_; }    void set_hash_child(bool v) {        has_hash_child_ = v;    }    bool has_plus_child() const { return has_plus_child_; }    void set_plus_child(bool v) {        has_plus_child_ = v;    }private:    std::uint64_t value_ : 62;    std::uint64_t has_hash_child_ : 1;    std::uint64_t has_plus_child_ : 1;};template<typename Value>class subscription_map_base {public:    using node_id_t = std::size_t;    using path_entry_key = std::pair<node_id_t, buffer>;    using handle = path_entry_key;private:    // Generate a node id for a new node    node_id_t generate_node_id() {        if(next_node_id == std::numeric_limits<node_id_t>::max())            throw_max_stored_topics();         return ++next_node_id;    }    using count_storage_t = count_storage<sizeof(void *)>;    struct path_entry {        node_id_t id;        path_entry_key parent;        count_storage_t count;        Value value;        path_entry(node_id_t id, path_entry_key parent)            : id(id), parent(parent)        {}    };    // Increase the subscription count for a specific node    static void increase_count_storage(count_storage_t &count) {        if(count.value() == count_storage_t::max()) {            throw_max_stored_topics();        }        count.increment_value();    }    // Decrease the subscription count for a specific node    static void decrease_count_storage(count_storage_t& count) {        BOOST_ASSERT(count.value() > 0);        count.decrement_value();    }    using this_type = subscription_map_base<Value>;    // Use boost hash to hash pair in path_entry_key    using map_type = std::unordered_map< path_entry_key, path_entry, boost::hash< path_entry_key > >;    map_type map;    using map_type_iterator = typename map_type::iterator;    using map_type_const_iterator = typename map_type::const_iterator;    node_id_t next_node_id = 0;protected:    // Key and id of the root key    path_entry_key root_key;    node_id_t root_node_id;    // Return the iterator of the root    map_type_iterator get_root() { return map.find(root_key); };    map_type_const_iterator get_root() const { return map.find(root_key); };    // Map size tracks the total number of subscriptions within the map    size_t map_size = 0;    map_type_iterator get_key(path_entry_key key) { return map.find(key); }    map_type_iterator begin() { return map.begin(); }    map_type_iterator end() { return map.end(); }    map_type const& get_map() const { return map; }    handle path_to_handle(std::vector< map_type_iterator > const& path) const {        return path.back()->first;    }    std::vector< map_type_iterator> find_topic_filter(string_view topic_filter) {        auto parent_id = get_root()->second.id;        std::vector< map_type_iterator > path;        topic_filter_tokenizer(            topic_filter,            [this, &path, &parent_id](string_view t) mutable {                auto entry = map.find(path_entry_key(parent_id, t));                if (entry == map.end()) {                    path.clear();                    return false;                }                path.push_back(entry);                parent_id = entry->second.id;                return true;            }        );        return path;    }    std::vector<map_type_iterator> create_topic_filter(string_view topic_filter) {        auto parent = get_root();        std::vector<map_type_iterator> result;        topic_filter_tokenizer(            topic_filter,            [this, &parent, &result](string_view t) mutable {                auto entry = map.find(path_entry_key(parent->second.id, t));                if (entry == map.end()) {                    entry =                        map.emplace(                            path_entry_key(                                parent->second.id,                                allocate_buffer(t)                            ),                            path_entry(generate_node_id(), parent->first)                        ).first;                    parent->second.count.set_plus_child(parent->second.count.has_plus_child() || (t == "+"));                    parent->second.count.set_hash_child(parent->second.count.has_hash_child() || (t == "#"));                }                else {                    increase_count_storage(entry->second.count);                }                result.push_back(entry);                parent = entry;                return true;            }        );        return result;    }    // Remove a value at the specified path    void remove_topic_filter(std::vector< map_type_iterator > const& path) {        bool remove_plus_child_flag = false;        bool remove_hash_child_flag = false;        // Go through entries to remove        for (auto& entry : boost::adaptors::reverse(path)) {            if (remove_plus_child_flag) {                entry->second.count.set_plus_child(false);                remove_plus_child_flag = false;            }            if (remove_hash_child_flag) {                entry->second.count.set_hash_child(false);                remove_hash_child_flag = false;            }            decrease_count_storage(entry->second.count);            if (entry->second.count.value() == 0) {                remove_plus_child_flag = (entry->first.second == "+");                remove_hash_child_flag = (entry->first.second == "#");                // Erase in unordered map only invalidates erased iterator                // other iterators are unaffected                map.erase(entry->first);            }        }        auto root = get_root();        if (remove_plus_child_flag) {            root->second.count.set_plus_child(false);        }        if (remove_hash_child_flag) {            root->second.count.set_hash_child(false);        }    }    template <typename ThisType, typename Output>    static void find_match_impl(ThisType& self, string_view topic, Output&& callback) {        using iterator_type = decltype(self.map.end()); // const_iterator or iterator depends on self        std::vector<iterator_type> entries;        entries.push_back(self.get_root());        topic_filter_tokenizer(            topic,            [&self, &entries, &callback](string_view t) {                std::vector<iterator_type> new_entries;                for (auto& entry : entries) {                    auto parent = entry->second.id;                    auto i = self.map.find(path_entry_key(parent, t));                    if (i != self.map.end()) {                        new_entries.push_back(i);                    }                    if (entry->second.count .has_plus_child()) {                        i = self.map.find(path_entry_key(parent, string_view("+")));                        if (i != self.map.end()) {                            if (parent != self.root_node_id || t.empty() || t[0] != '$') {                                new_entries.push_back(i);                            }                        }                    }                    if (entry->second.count.has_hash_child()) {                        i = self.map.find(path_entry_key(parent, string_view("#")));                        if (i != self.map.end()) {                            if (parent != self.root_node_id || t.empty() || t[0] != '$'){                                callback(i->second.value);                            }                        }                    }                }                std::swap(entries, new_entries);                return !entries.empty();            }        );        for (auto& entry : entries) {            callback(entry->second.value);        }    }    // Find all topic filters that match the specified topic    template<typename Output>    void find_match(string_view topic, Output&& callback) const {        find_match_impl(*this, topic, std::forward<Output>(callback));    }    // Find all topic filters and allow modification    template<typename Output>    void modify_match(string_view topic, Output&& callback) {        find_match_impl(*this, topic, std::forward<Output>(callback));    }    template<typename ThisType, typename Output>    static void handle_to_iterators(ThisType& self, handle const &h, Output&& output) {        auto i = h;        while(i != self.root_key) {            auto entry_iter = self.map.find(i);            if (entry_iter == self.map.end()) {                throw_invalid_handle();            }            output(entry_iter);            i = entry_iter->second.parent;        }    }    // Exceptions used    static void throw_invalid_topic_filter() { throw std::runtime_error("Subscription map invalid topic filter was specified"); }    static void throw_invalid_handle() { throw std::runtime_error("Subscription map invalid handle was specified"); }    static void throw_max_stored_topics() { throw std::overflow_error("Subscription map maximum number of stored topic filters reached"); }    // Get the iterators of a handle    std::vector<map_type_iterator> handle_to_iterators(handle const &h) {        std::vector<map_type_iterator> result;        handle_to_iterators(*this, h, [&result](map_type_iterator i) { result.push_back(i); });        std::reverse(result.begin(), result.end());        return result;    }    // Increase the number of subscriptions for this handle    void increase_subscriptions(handle const &h) {        handle_to_iterators(*this, h, [](map_type_iterator i) {            increase_count_storage(i->second.count);        });    }    // Increase the map size (total number of subscriptions stored)    void increase_map_size() {        if(map_size == std::numeric_limits<decltype(map_size)>::max()) {            throw_max_stored_topics();        }        ++map_size;    }    // Decrease the map size (total number of subscriptions stored)    void decrease_map_size() {        BOOST_ASSERT(map_size > 0);        --map_size;    }    // Increase the number of subscriptions for this path    void increase_subscriptions(std::vector<map_type_iterator> const &path) {        for (auto i : path) {            increase_count_storage(i->second.count);        }    }    subscription_map_base()    {        // Create the root node        root_node_id = generate_node_id();        root_key = path_entry_key(generate_node_id(), buffer());        map.emplace(root_key, path_entry(root_node_id, path_entry_key()));    }public:    // Return the number of elements in the tree    std::size_t internal_size() const { return map.size(); }    // Return the number of registered topic filters    std::size_t size() const { return this->map_size; }    // Lookup a topic filter    optional<handle> lookup(string_view topic_filter) {        auto path = this->find_topic_filter(topic_filter);        if(path.empty())            return optional<handle>();        else            return this->path_to_handle(force_move(path));    }    // Get path of topic_filter    std::string handle_to_topic_filter(handle const &h) const {        std::string result;        handle_to_iterators(*this, h, [&result](map_type_const_iterator i) {            if (result.empty()) {                result = std::string(i->first.second);            }            else {                result = std::string(i->first.second) + "/" + result;            }        });        return result;    }};template<typename Value>class single_subscription_map    : public subscription_map_base< optional<Value> > {public:    // Handle of an entry    using handle = typename subscription_map_base< Value >::handle;    // Insert a value at the specified topic_filter    template <typename V>    std::pair<handle, bool> insert(string_view topic_filter, V&& value) {        auto existing_subscription = this->find_topic_filter(topic_filter);        if (!existing_subscription.empty()) {            if(existing_subscription.back()->second.value)                return std::make_pair(this->path_to_handle(force_move(existing_subscription)), false);            existing_subscription.back()->second.value.emplace(std::forward<V>(value));            return std::make_pair(this->path_to_handle(force_move(existing_subscription)), true);        }        auto new_topic_filter = this->create_topic_filter(topic_filter);        new_topic_filter.back()->second.value = value;        this->increase_map_size();        return std::make_pair(this->path_to_handle(force_move(new_topic_filter)), true);    }    // Update a value at the specified topic filter    template <typename V>    void update(string_view topic_filter, V&& value) {        auto path = this->find_topic_filter(topic_filter);        if (path.empty()) {            this->throw_invalid_topic_filter();        }        path.back()->second.value.emplace(std::forward<V>(value));    }    template <typename V>    void update(handle const &h, V&& value) {        auto entry_iter = this->get_key(h);        if (entry_iter == this->end()) {            this->throw_invalid_topic_filter();        }        entry_iter->second.value.emplace(std::forward<V>(value));    }    // Remove a value at the specified topic filter    std::size_t erase(string_view topic_filter) {        auto path = this->find_topic_filter(topic_filter);        if (path.empty() || !path.back()->second.value) {            return 0;        }        this->remove_topic_filter(path);        this->decrease_map_size();        return 1;    }    // Remove a value using a handle    std::size_t erase(handle const &h) {        auto path = this->handle_to_iterators(h);        if (path.empty() || !path.back()->second.value) {            return 0;        }        this->remove_topic_filter(path);        this->decrease_map_size();        return 1;    }    // Find all topic filters that match the specified topic    template<typename Output>    void find(string_view topic, Output&& callback) const {        this->find_match(            topic,            [&callback]( optional<Value> const& value ) {                if (value) {                    callback(value.value());                }            }        );    }};template<typename Key, typename Value, class Hash = std::hash<Key>, class Pred = std::equal_to<Key>, class Cont = std::unordered_map<Key, Value, Hash, Pred, std::allocator< std::pair<const Key, Value> > > >class multiple_subscription_map    : public subscription_map_base< Cont >{public:    using container_t = Cont;    // Handle of an entry    using handle = typename subscription_map_base< Value >::handle;    // Insert a key => value at the specified topic filter    // returns the handle and true if key was inserted, false if key was updated    template <typename K, typename V>    std::pair<handle, bool> insert_or_assign(string_view topic_filter, K&& key, V&& value) {        auto path = this->find_topic_filter(topic_filter);        if (path.empty()) {            auto new_topic_filter = this->create_topic_filter(topic_filter);            new_topic_filter.back()->second.value.emplace(std::forward<K>(key), std::forward<V>(value));            this->increase_map_size();            return std::make_pair(this->path_to_handle(force_move(new_topic_filter)), true);        }        else {            auto& subscription_set = path.back()->second.value;#if __cpp_lib_unordered_map_try_emplace >= 201411L            auto insert_result = subscription_set.insert_or_assign(std::forward<K>(key), std::forward<V>(value));            if(insert_result.second) {                this->increase_subscriptions(path);                this->increase_map_size();            }            return std::make_pair(this->path_to_handle(force_move(path)), insert_result.second);#else            auto iter = subscription_set.find(key);            if(iter == subscription_set.end()) {                subscription_set.emplace(std::forward<K>(key), std::forward<V>(value));                this->increase_subscriptions(path);                this->increase_map_size();            } else {                iter->second = std::forward<V>(value);            }            return std::make_pair(this->path_to_handle(force_move(path)), iter == subscription_set.end());#endif        }    }    // Insert a key => value with a handle to the topic filter    // returns the handle and true if key was inserted, false if key was updated    template <typename K, typename V>    std::pair<handle, bool> insert_or_assign(handle const &h, K&& key, V&& value) {        auto h_iter = this->get_key(h);        if (h_iter == this->end()) {            this->throw_invalid_handle();        }        auto& subscription_set = h_iter->second.value;#if __cpp_lib_unordered_map_try_emplace >= 201411L        auto insert_result = subscription_set.insert_or_assign(std::forward<K>(key), std::forward<V>(value));        if(insert_result.second) {            this->increase_subscriptions(h);            this->increase_map_size();        }        return std::make_pair(h, insert_result.second);#else        auto iter = subscription_set.find(key);        if(iter == subscription_set.end()) {            subscription_set.emplace(std::forward<K>(key), std::forward<V>(value));            this->increase_subscriptions(h);            this->increase_map_size();        } else {            iter->second = std::forward<V>(value);        }        return std::make_pair(h, iter == subscription_set.end());#endif    }    // Remove a value at the specified handle    // returns the number of removed elements    std::size_t erase(handle const &h, Key const& key) {        // Find the handle in the map        auto h_iter = this->get_key(h);        if (h_iter == this->end()) {            this->throw_invalid_handle();        }        // Remove the specified value        auto result = h_iter->second.value.erase(key);        if (result) {            this->remove_topic_filter(this->handle_to_iterators(h));            this->decrease_map_size();        }        return result;    }    // Remove a value at the specified topic filter    // returns the number of removed elements    std::size_t erase(string_view topic_filter, Key const& key) {        // Find the topic filter in the map        auto path = this->find_topic_filter(topic_filter);        if (path.empty()) {            return 0;        }        // Remove the specified value        auto result = path.back()->second.value.erase(key);        if (result) {            this->decrease_map_size();            this->remove_topic_filter(path);        }        return result;    }    // Find all topic filters that match the specified topic    template<typename Output>    void find(string_view topic, Output&& callback) const {        this->find_match(            topic,            [&callback]( Cont const &values ) {                for (auto const& i : values) {                    callback(i.first, i.second);                }            }        );    }    // Find all topic filters that match and allow modification    template<typename Output>    void modify(string_view topic, Output&& callback) {        this->modify_match(            topic,            [&callback]( Cont &values ) {                for (auto& i : values) {                    callback(i.first, i.second);                }            }        );    }    template<typename Output>    void dump(Output &out) {        out << "Root node id: " << this->root_node_id << std::endl;        for (auto const& i: this->get_map()) {            out << "(" << i.first.first << ", " << i.first.second << "): id: " << i.second.id << ", size: " << i.second.value.size() << ", value: " << i.second.count.value << std::endl;        }    }};MQTT_BROKER_NS_END#endif // MQTT_BROKER_SUBSCRIPTION_MAP_HPP
 |