shared_subscriptions.hpp 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. // Copyright Takatoshi Kondo 2020
  2. //
  3. // Distributed under the Boost Software License, Version 1.0.
  4. // (See accompanying file LICENSE_1_0.txt or copy at
  5. // http://www.boost.org/LICENSE_1_0.txt)
  6. #if !defined(MQTT_SHARED_SUBSCRIPTIONS_HPP)
  7. #define MQTT_SHARED_SUBSCRIPTIONS_HPP
  8. #include <mqtt/config.hpp>
  9. #include <utility>
  10. #include <type_traits>
  11. #include <boost/assert.hpp>
  12. #include <mqtt/namespace.hpp>
  13. #include <mqtt/buffer.hpp>
  14. #include <mqtt/move.hpp>
  15. #include <mqtt/optional.hpp>
  16. namespace MQTT_NS {
  17. struct share_name_topic_filter {
  18. share_name_topic_filter(buffer share_name, buffer topic_filter)
  19. : share_name { force_move(share_name) }, topic_filter{ force_move(topic_filter) }
  20. {
  21. BOOST_ASSERT(!topic_filter.empty());
  22. }
  23. buffer share_name;
  24. buffer topic_filter;
  25. };
  26. inline bool operator<(share_name_topic_filter const& lhs, share_name_topic_filter const& rhs) {
  27. if (lhs.share_name < rhs.share_name) return true;
  28. if (rhs.share_name < lhs.share_name) return false;
  29. return lhs.topic_filter < rhs.topic_filter;
  30. }
  31. inline bool operator==(share_name_topic_filter const& lhs, share_name_topic_filter const& rhs) {
  32. return lhs.share_name == rhs.share_name && lhs.topic_filter == rhs.topic_filter;
  33. }
  34. inline bool operator!=(share_name_topic_filter const& lhs, share_name_topic_filter const& rhs) {
  35. return !(lhs == rhs);
  36. }
  37. inline optional<share_name_topic_filter> parse_shared_subscription(buffer whole_topic_filter) {
  38. auto const shared_prefix = string_view("$share/");
  39. if (whole_topic_filter.substr(0, shared_prefix.size()) != shared_prefix) {
  40. return share_name_topic_filter{ buffer{}, force_move(whole_topic_filter) };
  41. }
  42. // Remove $share/
  43. whole_topic_filter.remove_prefix(shared_prefix.size());
  44. // This is the '/' seperating the subscription group from the actual topic_filter.
  45. auto const idx = whole_topic_filter.find_first_of('/');
  46. if (idx == string_view::npos) return nullopt;
  47. // We return the share_name and the topic_filter as buffers that point to the same
  48. // storage. So we grab the substr for "share", and then remove it from whole_topic_filter.
  49. auto share_name = whole_topic_filter.substr(0, idx);
  50. whole_topic_filter.remove_prefix(std::min(idx + 1, whole_topic_filter.size()));
  51. if (share_name.empty() || whole_topic_filter.empty()) return nullopt;
  52. return share_name_topic_filter{ force_move(share_name), force_move(whole_topic_filter) };
  53. }
  54. namespace detail {
  55. template <typename T, typename U>
  56. inline buffer create_topic_filter_buffer(T const& share_name, U const& topic_filter) {
  57. string_view prefix = "$share/";
  58. // 1 is the length of '/' between share_name and topic_filter
  59. auto spa = make_shared_ptr_array(prefix.size() + share_name.size() + 1 + topic_filter.size());
  60. auto it = spa.get();
  61. auto start = it;
  62. std::copy(prefix.begin(), prefix.end(), it);
  63. it += prefix.size();
  64. std::copy(share_name.begin(), share_name.end(), it);
  65. it += share_name.size();
  66. *it++ = '/';
  67. std::copy(topic_filter.begin(), topic_filter.end(), it);
  68. it += topic_filter.size();
  69. return buffer(string_view(start, static_cast<std::size_t>(it - start)), force_move(spa));
  70. }
  71. } // namespace detail
  72. inline buffer create_topic_filter_buffer(string_view share_name, string_view topic_filter) {
  73. if (share_name.empty()) return allocate_buffer(topic_filter);
  74. return detail::create_topic_filter_buffer(share_name, topic_filter);
  75. }
  76. inline buffer create_topic_filter_buffer(string_view share_name, buffer topic_filter) {
  77. if (share_name.empty()) return topic_filter;
  78. return detail::create_topic_filter_buffer(share_name, topic_filter);
  79. }
  80. } // namespace MQTT_NS
  81. #endif // MQTT_SHARED_SUBSCRIPTIONS_HPP