retained_topic_map.hpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. // Copyright Wouter van Kleunen 2019
  2. //
  3. // Distributed under the Boost Software License, Version 1.0.
  4. // (See accompanying file LICENSE_1_0.txt or copy at
  5. // http://www.boost.org/LICENSE_1_0.txt)
  6. #if !defined(MQTT_BROKER_RETAINED_TOPIC_MAP_HPP)
  7. #define MQTT_BROKER_RETAINED_TOPIC_MAP_HPP
  8. #include <deque>
  9. #include <boost/functional/hash.hpp>
  10. #include <boost/multi_index_container.hpp>
  11. #include <boost/multi_index/ordered_index.hpp>
  12. #include <boost/multi_index/hashed_index.hpp>
  13. #include <boost/multi_index/composite_key.hpp>
  14. #include <boost/multi_index/member.hpp>
  15. #include <mqtt/broker/broker_namespace.hpp>
  16. #include <mqtt/string_view.hpp>
  17. #include <mqtt/optional.hpp>
  18. #include <mqtt/buffer.hpp>
  19. #include <mqtt/broker/topic_filter.hpp>
  20. MQTT_BROKER_NS_BEGIN
  21. namespace mi = boost::multi_index;
  22. template<typename Value>
  23. class retained_topic_map {
  24. // Exceptions used
  25. static void throw_max_stored_topics() { throw std::overflow_error("Retained map maximum number of topics reached"); }
  26. static void throw_no_wildcards_allowed() { throw std::runtime_error("Retained map no wildcards allowed in retained topic name"); }
  27. using node_id_t = std::size_t;
  28. static constexpr node_id_t root_parent_id = 0;
  29. static constexpr node_id_t root_node_id = 1;
  30. static constexpr node_id_t max_node_id = std::numeric_limits<node_id_t>::max();
  31. struct path_entry {
  32. node_id_t parent_id;
  33. buffer name_buffer;
  34. string_view name;
  35. node_id_t id;
  36. std::size_t count = 1;
  37. static constexpr std::size_t max_count = std::numeric_limits<std::size_t>::max();
  38. // Increase the count for this node
  39. void increase_count() {
  40. if (count == max_count) {
  41. throw_max_stored_topics();
  42. }
  43. ++count;
  44. }
  45. // Decrease the count for this node
  46. void decrease_count() {
  47. BOOST_ASSERT(count >= count);
  48. --count;
  49. }
  50. optional<Value> value;
  51. path_entry(node_id_t parent_id, string_view name, node_id_t id)
  52. : parent_id(parent_id), name_buffer(allocate_buffer(name)), name(name_buffer), id(id)
  53. { }
  54. };
  55. struct wildcard_index_tag { };
  56. struct direct_index_tag { };
  57. // allow for two indices on retained topics
  58. using path_entry_set = mi::multi_index_container<
  59. path_entry,
  60. mi::indexed_by<
  61. // index required for direct child access
  62. mi::hashed_unique <
  63. mi::tag<direct_index_tag>,
  64. mi::composite_key<path_entry,
  65. BOOST_MULTI_INDEX_MEMBER(path_entry, node_id_t, parent_id),
  66. BOOST_MULTI_INDEX_MEMBER(path_entry, string_view, name) >
  67. >,
  68. // index required for wildcard processing
  69. mi::ordered_non_unique< mi::tag<wildcard_index_tag>, BOOST_MULTI_INDEX_MEMBER(path_entry, node_id_t, parent_id) >
  70. >
  71. >;
  72. using direct_const_iterator = typename path_entry_set::template index<direct_index_tag>::type::const_iterator;
  73. using wildcard_const_iterator = typename path_entry_set::template index<wildcard_index_tag>::type::const_iterator;
  74. path_entry_set map;
  75. size_t map_size;
  76. node_id_t next_node_id;
  77. direct_const_iterator root;
  78. direct_const_iterator create_topic(string_view topic) {
  79. direct_const_iterator parent = root;
  80. topic_filter_tokenizer(
  81. topic,
  82. [this, &parent](string_view t) {
  83. if (t == "+" || t == "#") {
  84. throw_no_wildcards_allowed();
  85. }
  86. node_id_t parent_id = parent->id;
  87. auto& direct_index = map.template get<direct_index_tag>();
  88. direct_const_iterator entry = direct_index.find(std::make_tuple(parent_id, t));
  89. if (entry == direct_index.end()) {
  90. entry = map.insert(path_entry(parent->id, t, next_node_id++)).first;
  91. if (next_node_id == max_node_id) {
  92. throw_max_stored_topics();
  93. }
  94. }
  95. else {
  96. direct_index.modify(entry, [](path_entry& entry){ entry.increase_count(); });
  97. }
  98. parent = entry;
  99. return true;
  100. }
  101. );
  102. return parent;
  103. }
  104. std::vector<direct_const_iterator> find_topic(string_view topic) {
  105. std::vector<direct_const_iterator> path;
  106. direct_const_iterator parent = root;
  107. topic_filter_tokenizer(
  108. topic,
  109. [this, &parent, &path](string_view t) {
  110. auto const& direct_index = map.template get<direct_index_tag>();
  111. auto entry = direct_index.find(std::make_tuple(parent->id, t));
  112. if (entry == direct_index.end()) {
  113. path = std::vector<direct_const_iterator>();
  114. return false;
  115. }
  116. path.push_back(entry);
  117. parent = entry;
  118. return true;
  119. }
  120. );
  121. return path;
  122. }
  123. // Match all underlying topics when a hash entry is matched
  124. // perform a breadth-first iteration over all items in the tree below
  125. template<typename Output>
  126. void match_hash_entries(node_id_t parent, Output&& callback, bool ignore_system) const {
  127. std::deque<node_id_t> entries;
  128. entries.push_back(parent);
  129. std::deque<node_id_t> new_entries;
  130. auto const& wildcard_index = map.template get<wildcard_index_tag>();
  131. while (!entries.empty()) {
  132. new_entries.resize(0);
  133. for (auto root : entries) {
  134. // Find all entries below this node
  135. for (auto i = wildcard_index.lower_bound(root); i != wildcard_index.end() && i->parent_id == root; ++i) {
  136. // Should we ignore system matches
  137. if (!ignore_system || i->name.empty() || i->name[0] != '$') {
  138. if (i->value) {
  139. callback(*i->value);
  140. }
  141. new_entries.push_back(i->id);
  142. }
  143. }
  144. }
  145. // Ignore system only on first level
  146. ignore_system = false;
  147. std::swap(entries, new_entries);
  148. }
  149. }
  150. // Find all topics that match the specified topic filter
  151. template<typename Output>
  152. void find_match(string_view topic_filter, Output&& callback) const {
  153. std::deque<direct_const_iterator> entries;
  154. entries.push_back(root);
  155. std::deque<direct_const_iterator> new_entries;
  156. topic_filter_tokenizer(
  157. topic_filter,
  158. [this, &entries, &new_entries, &callback](string_view t) {
  159. auto const& direct_index = map.template get<direct_index_tag>();
  160. auto const& wildcard_index = map.template get<wildcard_index_tag>();
  161. new_entries.resize(0);
  162. for (auto const& entry : entries) {
  163. node_id_t parent = entry->id;
  164. if (t == string_view("+")) {
  165. for (auto i = wildcard_index.lower_bound(parent); i != wildcard_index.end() && i->parent_id == parent; ++i) {
  166. if (parent != root_node_id || i->name.empty() || i->name[0] != '$') {
  167. new_entries.push_back(map.template project<direct_index_tag, wildcard_const_iterator>(i));
  168. }
  169. else {
  170. break;
  171. }
  172. }
  173. }
  174. else if (t == string_view("#")) {
  175. match_hash_entries(parent, callback, parent == root_node_id);
  176. return false;
  177. }
  178. else {
  179. direct_const_iterator i = direct_index.find(std::make_tuple(parent, t));
  180. if (i != direct_index.end()) {
  181. new_entries.push_back(i);
  182. }
  183. }
  184. }
  185. std::swap(new_entries, entries);
  186. return !entries.empty();
  187. }
  188. );
  189. for (auto& entry : entries) {
  190. if (entry->value) {
  191. callback(*entry->value);
  192. }
  193. }
  194. }
  195. // Remove a value at the specified topic
  196. size_t erase_topic(string_view topic) {
  197. auto path = find_topic(topic);
  198. // Reset the value if there is actually something stored
  199. if (!path.empty() && path.back()->value) {
  200. auto& direct_index = map.template get<direct_index_tag>();
  201. direct_index.modify(path.back(), [](path_entry &entry){ entry.value = nullopt; });
  202. // Do iterators stay valid when erasing ? I think they do ?
  203. for (auto entry : path) {
  204. direct_index.modify(entry, [](path_entry& entry){ entry.decrease_count(); });
  205. if (entry->count == 0) {
  206. map.erase(entry);
  207. }
  208. }
  209. return 1;
  210. }
  211. return 0;
  212. }
  213. // Increase the number of topics for this path
  214. void increase_topics(std::vector<direct_const_iterator> const &path) {
  215. auto& direct_index = map.template get<direct_index_tag>();
  216. for(auto& i : path) {
  217. direct_index.modify(i, [](path_entry& entry){ entry.increase_count(); });
  218. }
  219. }
  220. // Increase the map size (total number of topics stored)
  221. void increase_map_size() {
  222. if(map_size == std::numeric_limits<decltype(map_size)>::max()) {
  223. throw_max_stored_topics();
  224. }
  225. ++map_size;
  226. }
  227. // Decrease the map size (total number of topics stored)
  228. void decrease_map_size(size_t count) {
  229. BOOST_ASSERT(map_size >= count);
  230. map_size -= count;
  231. }
  232. void init_map() {
  233. map_size = 0;
  234. // Create the root node
  235. root = map.insert(path_entry(root_parent_id, "", root_node_id)).first;
  236. next_node_id = root_node_id + 1;
  237. }
  238. public:
  239. retained_topic_map()
  240. {
  241. init_map();
  242. }
  243. // Insert a value at the specified topic
  244. template<typename V>
  245. std::size_t insert_or_assign(string_view topic, V&& value) {
  246. auto& direct_index = map.template get<direct_index_tag>();
  247. auto path = this->find_topic(topic);
  248. if (path.empty()) {
  249. auto new_topic = this->create_topic(topic);
  250. direct_index.modify(new_topic, [&value](path_entry &entry) mutable { entry.value.emplace(std::forward<V>(value)); });
  251. increase_map_size();
  252. return 1;
  253. }
  254. if (!path.back()->value) {
  255. this->increase_topics(path);
  256. direct_index.modify(path.back(), [&value](path_entry &entry) mutable { entry.value.emplace(std::forward<V>(value)); });
  257. increase_map_size();
  258. return 1;
  259. }
  260. direct_index.modify(path.back(), [&value](path_entry &entry) mutable { entry.value.emplace(std::forward<V>(value)); });
  261. return 0;
  262. }
  263. // Find all stored topics that math the specified topic_filter
  264. template<typename Output>
  265. void find(string_view topic_filter, Output&& callback) const {
  266. find_match(topic_filter, std::forward<Output>(callback));
  267. }
  268. // Remove a stored value at the specified topic
  269. std::size_t erase(string_view topic) {
  270. auto result = erase_topic(topic);
  271. decrease_map_size(result);
  272. return result;
  273. }
  274. // Get the number of entries stored in the map
  275. std::size_t size() const { return map_size; }
  276. // Get the number of entries in the map (for debugging purpose only)
  277. std::size_t internal_size() const { return map.size(); }
  278. // Clear all topics
  279. void clear() {
  280. map.clear();
  281. init_map();
  282. }
  283. // Dump debug information
  284. template<typename Output>
  285. void dump(Output &out) {
  286. auto const& direct_index = map.template get<direct_index_tag>();
  287. for (auto const& i : direct_index) {
  288. out << i.parent_id << " " << i.name << " " << (i.value ? "init" : "-") << " " << i.count << '\n';
  289. }
  290. }
  291. };
  292. MQTT_BROKER_NS_END
  293. #endif // MQTT_BROKER_RETAINED_TOPIC_MAP_HPP