subscription_map.hpp 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742
  1. // Copyright Wouter van Kleunen 2019
  2. //
  3. // Distributed under the Boost Software License, Version 1.0.
  4. // (See accompanying file LICENSE_1_0.txt or copy at
  5. // http://www.boost.org/LICENSE_1_0.txt)
  6. #if !defined(MQTT_BROKER_SUBSCRIPTION_MAP_HPP)
  7. #define MQTT_BROKER_SUBSCRIPTION_MAP_HPP
  8. #include <unordered_map>
  9. #include <boost/functional/hash.hpp>
  10. #include <boost/range/adaptor/reversed.hpp>
  11. #include <mqtt/broker/broker_namespace.hpp>
  12. #include <mqtt/string_view.hpp>
  13. #include <mqtt/optional.hpp>
  14. #include <mqtt/buffer.hpp>
  15. #include <mqtt/broker/topic_filter.hpp>
  16. MQTT_BROKER_NS_BEGIN
  17. /**
  18. *
  19. * In MQTT we have:
  20. * Clients subscribed with certain topic filters, topic filters are path with may contain wildcards such as
  21. * + and #
  22. * . A subscription to "#" will not receive any messages published to a topic beginning with a $
  23. * · A subscription to "+/monitor/Clients" will not receive any messages published to "$SYS/monitor/Clients"
  24. * · A subscription to "$SYS/#" will receive messages published to topics beginning with "$SYS/"
  25. * · A subscription to "$SYS/monitor/Clients/+" will receive messages published to "$SYS/monitor/Clients/"
  26. * · For a Client to receive messages from topics that begin with $SYS/ and from topics that don’t begin with a $, it has to subscribe to both “#” and “$SYS/#”
  27. * Check whether a string is a valid subscription using 'mqtt_valid_subscription'
  28. *
  29. * Topics being published, a topic is a sort of path and does not contain wildcards
  30. * · $SYS/ has been widely adopted as a prefix to topics that contain Server-specific information or control APIs
  31. * · Applications cannot use a topic with a leading $ character for their own purposes
  32. * Check whether a string is a valid topic using 'mqtt_valid_topic'
  33. *
  34. *
  35. * We introduce two data structures:
  36. * . A subscription map, storing a topic_filter -> data
  37. * Using a published topic, we can find all topic filters which match the specified topic
  38. * . A stored topic map, storing topic -> data
  39. * Using a new topic filter, we can find all stored topics which match the specified topic filter
  40. *
  41. * Subscription map stores all entries in a tree
  42. * the tree starts from a root node, and topic filters are tokenized and stored in the tree
  43. *
  44. * For example if the topic_filter example/monitor/Clients is stored, the following nodes are created:
  45. * root -> example -> monitor -> Clients
  46. *
  47. * Every node in the tree may store one or multiple subscribers. Nodes store a reference count to the number of subscribers
  48. * so for example, if we store the following topic_filters:
  49. * example/
  50. * example/monitor/Clients
  51. *
  52. * the subscription map looks as follows:
  53. * root(2) -> example(2) -> monitor(1) -> Clients (1)
  54. *
  55. * hash and + are stored as normal nodes within the tree, but the parent node knows if a hash child is available. This
  56. * improves the matching, no extra lookup is required to see if a # or + child is available in a child node:
  57. *
  58. * example/#
  59. *
  60. * stores the following tree:
  61. * root -> example (hash: yes) -> #
  62. *
  63. * and
  64. *
  65. * example/+
  66. *
  67. * stores the following tree:
  68. * root -> example (plus: yes) -> #
  69. *
  70. * all node entries are stored in a single hash map. The key for every node is: (parent node id, path)
  71. *
  72. * so if we store: root/example/test
  73. * root (id:1) -> example (id:2, key:1,example) -> test (id:3, key:2,test)
  74. *
  75. * also, every node stores the key of its parent, allowing quick traversing from leaf to root of the tree
  76. */
  77. // Combined storage for count and flags
  78. // we can have 32bit or 64bit version
  79. // Compile error on other platforms (not 32 or 64 bit)
  80. template<std::size_t N>
  81. struct count_storage {
  82. static_assert(N == 4 || N == 8, "Subscription map count_storage only knows how to handle architectures with 32 or 64 bit size_t: please update to support your platform.");
  83. };
  84. template<>
  85. struct count_storage<4> {
  86. public:
  87. count_storage(std::uint32_t v = 1)
  88. : value_(v & 0x3fffffffUL), has_hash_child_(false), has_plus_child_(false)
  89. { }
  90. static constexpr std::size_t max() { return std::numeric_limits<uint32_t>::max() >> 2; }
  91. std::uint32_t value() const { return value_; }
  92. void set_value(std::uint32_t v) {
  93. value_ = v & 0x3fffffffUL;
  94. }
  95. void increment_value() {
  96. ++value_;
  97. }
  98. void decrement_value() {
  99. --value_;
  100. }
  101. bool has_hash_child() const { return has_hash_child_; }
  102. void set_hash_child(bool v) {
  103. has_hash_child_ = v;
  104. }
  105. bool has_plus_child() const { return has_plus_child_; }
  106. void set_plus_child(bool v) {
  107. has_plus_child_ = v;
  108. }
  109. private:
  110. std::uint32_t value_ : 30;
  111. std::uint32_t has_hash_child_ : 1;
  112. std::uint32_t has_plus_child_ : 1;
  113. };
  114. template<>
  115. struct count_storage<8> {
  116. public:
  117. count_storage(std::uint64_t v= 1)
  118. : value_(v & 0x3fffffffffffffffULL), has_hash_child_(false), has_plus_child_(false)
  119. { }
  120. static constexpr std::uint64_t max() { return std::numeric_limits<uint64_t>::max() >> 2; }
  121. std::uint64_t value() const { return value_; }
  122. void set_value(std::uint64_t v) {
  123. value_ = v & 0x3fffffffffffffffULL;
  124. }
  125. void increment_value() {
  126. ++value_;
  127. }
  128. void decrement_value() {
  129. --value_;
  130. }
  131. bool has_hash_child() const { return has_hash_child_; }
  132. void set_hash_child(bool v) {
  133. has_hash_child_ = v;
  134. }
  135. bool has_plus_child() const { return has_plus_child_; }
  136. void set_plus_child(bool v) {
  137. has_plus_child_ = v;
  138. }
  139. private:
  140. std::uint64_t value_ : 62;
  141. std::uint64_t has_hash_child_ : 1;
  142. std::uint64_t has_plus_child_ : 1;
  143. };
  144. template<typename Value>
  145. class subscription_map_base {
  146. public:
  147. using node_id_t = std::size_t;
  148. using path_entry_key = std::pair<node_id_t, buffer>;
  149. using handle = path_entry_key;
  150. private:
  151. // Generate a node id for a new node
  152. node_id_t generate_node_id() {
  153. if(next_node_id == std::numeric_limits<node_id_t>::max())
  154. throw_max_stored_topics();
  155. return ++next_node_id;
  156. }
  157. using count_storage_t = count_storage<sizeof(void *)>;
  158. struct path_entry {
  159. node_id_t id;
  160. path_entry_key parent;
  161. count_storage_t count;
  162. Value value;
  163. path_entry(node_id_t id, path_entry_key parent)
  164. : id(id), parent(parent)
  165. {}
  166. };
  167. // Increase the subscription count for a specific node
  168. static void increase_count_storage(count_storage_t &count) {
  169. if(count.value() == count_storage_t::max()) {
  170. throw_max_stored_topics();
  171. }
  172. count.increment_value();
  173. }
  174. // Decrease the subscription count for a specific node
  175. static void decrease_count_storage(count_storage_t& count) {
  176. BOOST_ASSERT(count.value() > 0);
  177. count.decrement_value();
  178. }
  179. using this_type = subscription_map_base<Value>;
  180. // Use boost hash to hash pair in path_entry_key
  181. using map_type = std::unordered_map< path_entry_key, path_entry, boost::hash< path_entry_key > >;
  182. map_type map;
  183. using map_type_iterator = typename map_type::iterator;
  184. using map_type_const_iterator = typename map_type::const_iterator;
  185. node_id_t next_node_id = 0;
  186. protected:
  187. // Key and id of the root key
  188. path_entry_key root_key;
  189. node_id_t root_node_id;
  190. // Return the iterator of the root
  191. map_type_iterator get_root() { return map.find(root_key); };
  192. map_type_const_iterator get_root() const { return map.find(root_key); };
  193. // Map size tracks the total number of subscriptions within the map
  194. size_t map_size = 0;
  195. map_type_iterator get_key(path_entry_key key) { return map.find(key); }
  196. map_type_iterator begin() { return map.begin(); }
  197. map_type_iterator end() { return map.end(); }
  198. map_type const& get_map() const { return map; }
  199. handle path_to_handle(std::vector< map_type_iterator > const& path) const {
  200. return path.back()->first;
  201. }
  202. std::vector< map_type_iterator> find_topic_filter(string_view topic_filter) {
  203. auto parent_id = get_root()->second.id;
  204. std::vector< map_type_iterator > path;
  205. topic_filter_tokenizer(
  206. topic_filter,
  207. [this, &path, &parent_id](string_view t) mutable {
  208. auto entry = map.find(path_entry_key(parent_id, t));
  209. if (entry == map.end()) {
  210. path.clear();
  211. return false;
  212. }
  213. path.push_back(entry);
  214. parent_id = entry->second.id;
  215. return true;
  216. }
  217. );
  218. return path;
  219. }
  220. std::vector<map_type_iterator> create_topic_filter(string_view topic_filter) {
  221. auto parent = get_root();
  222. std::vector<map_type_iterator> result;
  223. topic_filter_tokenizer(
  224. topic_filter,
  225. [this, &parent, &result](string_view t) mutable {
  226. auto entry = map.find(path_entry_key(parent->second.id, t));
  227. if (entry == map.end()) {
  228. entry =
  229. map.emplace(
  230. path_entry_key(
  231. parent->second.id,
  232. allocate_buffer(t)
  233. ),
  234. path_entry(generate_node_id(), parent->first)
  235. ).first;
  236. parent->second.count.set_plus_child(parent->second.count.has_plus_child() || (t == "+"));
  237. parent->second.count.set_hash_child(parent->second.count.has_hash_child() || (t == "#"));
  238. }
  239. else {
  240. increase_count_storage(entry->second.count);
  241. }
  242. result.push_back(entry);
  243. parent = entry;
  244. return true;
  245. }
  246. );
  247. return result;
  248. }
  249. // Remove a value at the specified path
  250. void remove_topic_filter(std::vector< map_type_iterator > const& path) {
  251. bool remove_plus_child_flag = false;
  252. bool remove_hash_child_flag = false;
  253. // Go through entries to remove
  254. for (auto& entry : boost::adaptors::reverse(path)) {
  255. if (remove_plus_child_flag) {
  256. entry->second.count.set_plus_child(false);
  257. remove_plus_child_flag = false;
  258. }
  259. if (remove_hash_child_flag) {
  260. entry->second.count.set_hash_child(false);
  261. remove_hash_child_flag = false;
  262. }
  263. decrease_count_storage(entry->second.count);
  264. if (entry->second.count.value() == 0) {
  265. remove_plus_child_flag = (entry->first.second == "+");
  266. remove_hash_child_flag = (entry->first.second == "#");
  267. // Erase in unordered map only invalidates erased iterator
  268. // other iterators are unaffected
  269. map.erase(entry->first);
  270. }
  271. }
  272. auto root = get_root();
  273. if (remove_plus_child_flag) {
  274. root->second.count.set_plus_child(false);
  275. }
  276. if (remove_hash_child_flag) {
  277. root->second.count.set_hash_child(false);
  278. }
  279. }
  280. template <typename ThisType, typename Output>
  281. static void find_match_impl(ThisType& self, string_view topic, Output&& callback) {
  282. using iterator_type = decltype(self.map.end()); // const_iterator or iterator depends on self
  283. std::vector<iterator_type> entries;
  284. entries.push_back(self.get_root());
  285. topic_filter_tokenizer(
  286. topic,
  287. [&self, &entries, &callback](string_view t) {
  288. std::vector<iterator_type> new_entries;
  289. for (auto& entry : entries) {
  290. auto parent = entry->second.id;
  291. auto i = self.map.find(path_entry_key(parent, t));
  292. if (i != self.map.end()) {
  293. new_entries.push_back(i);
  294. }
  295. if (entry->second.count .has_plus_child()) {
  296. i = self.map.find(path_entry_key(parent, string_view("+")));
  297. if (i != self.map.end()) {
  298. if (parent != self.root_node_id || t.empty() || t[0] != '$') {
  299. new_entries.push_back(i);
  300. }
  301. }
  302. }
  303. if (entry->second.count.has_hash_child()) {
  304. i = self.map.find(path_entry_key(parent, string_view("#")));
  305. if (i != self.map.end()) {
  306. if (parent != self.root_node_id || t.empty() || t[0] != '$'){
  307. callback(i->second.value);
  308. }
  309. }
  310. }
  311. }
  312. std::swap(entries, new_entries);
  313. return !entries.empty();
  314. }
  315. );
  316. for (auto& entry : entries) {
  317. callback(entry->second.value);
  318. }
  319. }
  320. // Find all topic filters that match the specified topic
  321. template<typename Output>
  322. void find_match(string_view topic, Output&& callback) const {
  323. find_match_impl(*this, topic, std::forward<Output>(callback));
  324. }
  325. // Find all topic filters and allow modification
  326. template<typename Output>
  327. void modify_match(string_view topic, Output&& callback) {
  328. find_match_impl(*this, topic, std::forward<Output>(callback));
  329. }
  330. template<typename ThisType, typename Output>
  331. static void handle_to_iterators(ThisType& self, handle const &h, Output&& output) {
  332. auto i = h;
  333. while(i != self.root_key) {
  334. auto entry_iter = self.map.find(i);
  335. if (entry_iter == self.map.end()) {
  336. throw_invalid_handle();
  337. }
  338. output(entry_iter);
  339. i = entry_iter->second.parent;
  340. }
  341. }
  342. // Exceptions used
  343. static void throw_invalid_topic_filter() { throw std::runtime_error("Subscription map invalid topic filter was specified"); }
  344. static void throw_invalid_handle() { throw std::runtime_error("Subscription map invalid handle was specified"); }
  345. static void throw_max_stored_topics() { throw std::overflow_error("Subscription map maximum number of stored topic filters reached"); }
  346. // Get the iterators of a handle
  347. std::vector<map_type_iterator> handle_to_iterators(handle const &h) {
  348. std::vector<map_type_iterator> result;
  349. handle_to_iterators(*this, h, [&result](map_type_iterator i) { result.push_back(i); });
  350. std::reverse(result.begin(), result.end());
  351. return result;
  352. }
  353. // Increase the number of subscriptions for this handle
  354. void increase_subscriptions(handle const &h) {
  355. handle_to_iterators(*this, h, [](map_type_iterator i) {
  356. increase_count_storage(i->second.count);
  357. });
  358. }
  359. // Increase the map size (total number of subscriptions stored)
  360. void increase_map_size() {
  361. if(map_size == std::numeric_limits<decltype(map_size)>::max()) {
  362. throw_max_stored_topics();
  363. }
  364. ++map_size;
  365. }
  366. // Decrease the map size (total number of subscriptions stored)
  367. void decrease_map_size() {
  368. BOOST_ASSERT(map_size > 0);
  369. --map_size;
  370. }
  371. // Increase the number of subscriptions for this path
  372. void increase_subscriptions(std::vector<map_type_iterator> const &path) {
  373. for (auto i : path) {
  374. increase_count_storage(i->second.count);
  375. }
  376. }
  377. subscription_map_base()
  378. {
  379. // Create the root node
  380. root_node_id = generate_node_id();
  381. root_key = path_entry_key(generate_node_id(), buffer());
  382. map.emplace(root_key, path_entry(root_node_id, path_entry_key()));
  383. }
  384. public:
  385. // Return the number of elements in the tree
  386. std::size_t internal_size() const { return map.size(); }
  387. // Return the number of registered topic filters
  388. std::size_t size() const { return this->map_size; }
  389. // Lookup a topic filter
  390. optional<handle> lookup(string_view topic_filter) {
  391. auto path = this->find_topic_filter(topic_filter);
  392. if(path.empty())
  393. return optional<handle>();
  394. else
  395. return this->path_to_handle(force_move(path));
  396. }
  397. // Get path of topic_filter
  398. std::string handle_to_topic_filter(handle const &h) const {
  399. std::string result;
  400. handle_to_iterators(*this, h, [&result](map_type_const_iterator i) {
  401. if (result.empty()) {
  402. result = std::string(i->first.second);
  403. }
  404. else {
  405. result = std::string(i->first.second) + "/" + result;
  406. }
  407. });
  408. return result;
  409. }
  410. };
  411. template<typename Value>
  412. class single_subscription_map
  413. : public subscription_map_base< optional<Value> > {
  414. public:
  415. // Handle of an entry
  416. using handle = typename subscription_map_base< Value >::handle;
  417. // Insert a value at the specified topic_filter
  418. template <typename V>
  419. std::pair<handle, bool> insert(string_view topic_filter, V&& value) {
  420. auto existing_subscription = this->find_topic_filter(topic_filter);
  421. if (!existing_subscription.empty()) {
  422. if(existing_subscription.back()->second.value)
  423. return std::make_pair(this->path_to_handle(force_move(existing_subscription)), false);
  424. existing_subscription.back()->second.value.emplace(std::forward<V>(value));
  425. return std::make_pair(this->path_to_handle(force_move(existing_subscription)), true);
  426. }
  427. auto new_topic_filter = this->create_topic_filter(topic_filter);
  428. new_topic_filter.back()->second.value = value;
  429. this->increase_map_size();
  430. return std::make_pair(this->path_to_handle(force_move(new_topic_filter)), true);
  431. }
  432. // Update a value at the specified topic filter
  433. template <typename V>
  434. void update(string_view topic_filter, V&& value) {
  435. auto path = this->find_topic_filter(topic_filter);
  436. if (path.empty()) {
  437. this->throw_invalid_topic_filter();
  438. }
  439. path.back()->second.value.emplace(std::forward<V>(value));
  440. }
  441. template <typename V>
  442. void update(handle const &h, V&& value) {
  443. auto entry_iter = this->get_key(h);
  444. if (entry_iter == this->end()) {
  445. this->throw_invalid_topic_filter();
  446. }
  447. entry_iter->second.value.emplace(std::forward<V>(value));
  448. }
  449. // Remove a value at the specified topic filter
  450. std::size_t erase(string_view topic_filter) {
  451. auto path = this->find_topic_filter(topic_filter);
  452. if (path.empty() || !path.back()->second.value) {
  453. return 0;
  454. }
  455. this->remove_topic_filter(path);
  456. this->decrease_map_size();
  457. return 1;
  458. }
  459. // Remove a value using a handle
  460. std::size_t erase(handle const &h) {
  461. auto path = this->handle_to_iterators(h);
  462. if (path.empty() || !path.back()->second.value) {
  463. return 0;
  464. }
  465. this->remove_topic_filter(path);
  466. this->decrease_map_size();
  467. return 1;
  468. }
  469. // Find all topic filters that match the specified topic
  470. template<typename Output>
  471. void find(string_view topic, Output&& callback) const {
  472. this->find_match(
  473. topic,
  474. [&callback]( optional<Value> const& value ) {
  475. if (value) {
  476. callback(value.value());
  477. }
  478. }
  479. );
  480. }
  481. };
  482. template<typename Key, typename Value, class Hash = std::hash<Key>, class Pred = std::equal_to<Key>, class Cont = std::unordered_map<Key, Value, Hash, Pred, std::allocator< std::pair<const Key, Value> > > >
  483. class multiple_subscription_map
  484. : public subscription_map_base< Cont >
  485. {
  486. public:
  487. using container_t = Cont;
  488. // Handle of an entry
  489. using handle = typename subscription_map_base< Value >::handle;
  490. // Insert a key => value at the specified topic filter
  491. // returns the handle and true if key was inserted, false if key was updated
  492. template <typename K, typename V>
  493. std::pair<handle, bool> insert_or_assign(string_view topic_filter, K&& key, V&& value) {
  494. auto path = this->find_topic_filter(topic_filter);
  495. if (path.empty()) {
  496. auto new_topic_filter = this->create_topic_filter(topic_filter);
  497. new_topic_filter.back()->second.value.emplace(std::forward<K>(key), std::forward<V>(value));
  498. this->increase_map_size();
  499. return std::make_pair(this->path_to_handle(force_move(new_topic_filter)), true);
  500. }
  501. else {
  502. auto& subscription_set = path.back()->second.value;
  503. #if __cpp_lib_unordered_map_try_emplace >= 201411L
  504. auto insert_result = subscription_set.insert_or_assign(std::forward<K>(key), std::forward<V>(value));
  505. if(insert_result.second) {
  506. this->increase_subscriptions(path);
  507. this->increase_map_size();
  508. }
  509. return std::make_pair(this->path_to_handle(force_move(path)), insert_result.second);
  510. #else
  511. auto iter = subscription_set.find(key);
  512. if(iter == subscription_set.end()) {
  513. subscription_set.emplace(std::forward<K>(key), std::forward<V>(value));
  514. this->increase_subscriptions(path);
  515. this->increase_map_size();
  516. } else {
  517. iter->second = std::forward<V>(value);
  518. }
  519. return std::make_pair(this->path_to_handle(force_move(path)), iter == subscription_set.end());
  520. #endif
  521. }
  522. }
  523. // Insert a key => value with a handle to the topic filter
  524. // returns the handle and true if key was inserted, false if key was updated
  525. template <typename K, typename V>
  526. std::pair<handle, bool> insert_or_assign(handle const &h, K&& key, V&& value) {
  527. auto h_iter = this->get_key(h);
  528. if (h_iter == this->end()) {
  529. this->throw_invalid_handle();
  530. }
  531. auto& subscription_set = h_iter->second.value;
  532. #if __cpp_lib_unordered_map_try_emplace >= 201411L
  533. auto insert_result = subscription_set.insert_or_assign(std::forward<K>(key), std::forward<V>(value));
  534. if(insert_result.second) {
  535. this->increase_subscriptions(h);
  536. this->increase_map_size();
  537. }
  538. return std::make_pair(h, insert_result.second);
  539. #else
  540. auto iter = subscription_set.find(key);
  541. if(iter == subscription_set.end()) {
  542. subscription_set.emplace(std::forward<K>(key), std::forward<V>(value));
  543. this->increase_subscriptions(h);
  544. this->increase_map_size();
  545. } else {
  546. iter->second = std::forward<V>(value);
  547. }
  548. return std::make_pair(h, iter == subscription_set.end());
  549. #endif
  550. }
  551. // Remove a value at the specified handle
  552. // returns the number of removed elements
  553. std::size_t erase(handle const &h, Key const& key) {
  554. // Find the handle in the map
  555. auto h_iter = this->get_key(h);
  556. if (h_iter == this->end()) {
  557. this->throw_invalid_handle();
  558. }
  559. // Remove the specified value
  560. auto result = h_iter->second.value.erase(key);
  561. if (result) {
  562. this->remove_topic_filter(this->handle_to_iterators(h));
  563. this->decrease_map_size();
  564. }
  565. return result;
  566. }
  567. // Remove a value at the specified topic filter
  568. // returns the number of removed elements
  569. std::size_t erase(string_view topic_filter, Key const& key) {
  570. // Find the topic filter in the map
  571. auto path = this->find_topic_filter(topic_filter);
  572. if (path.empty()) {
  573. return 0;
  574. }
  575. // Remove the specified value
  576. auto result = path.back()->second.value.erase(key);
  577. if (result) {
  578. this->decrease_map_size();
  579. this->remove_topic_filter(path);
  580. }
  581. return result;
  582. }
  583. // Find all topic filters that match the specified topic
  584. template<typename Output>
  585. void find(string_view topic, Output&& callback) const {
  586. this->find_match(
  587. topic,
  588. [&callback]( Cont const &values ) {
  589. for (auto const& i : values) {
  590. callback(i.first, i.second);
  591. }
  592. }
  593. );
  594. }
  595. // Find all topic filters that match and allow modification
  596. template<typename Output>
  597. void modify(string_view topic, Output&& callback) {
  598. this->modify_match(
  599. topic,
  600. [&callback]( Cont &values ) {
  601. for (auto& i : values) {
  602. callback(i.first, i.second);
  603. }
  604. }
  605. );
  606. }
  607. template<typename Output>
  608. void dump(Output &out) {
  609. out << "Root node id: " << this->root_node_id << std::endl;
  610. for (auto const& i: this->get_map()) {
  611. out << "(" << i.first.first << ", " << i.first.second << "): id: " << i.second.id << ", size: " << i.second.value.size() << ", value: " << i.second.count.value << std::endl;
  612. }
  613. }
  614. };
  615. MQTT_BROKER_NS_END
  616. #endif // MQTT_BROKER_SUBSCRIPTION_MAP_HPP