shared_target_impl.hpp 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. // Copyright Takatoshi Kondo 2020
  2. //
  3. // Distributed under the Boost Software License, Version 1.0.
  4. // (See accompanying file LICENSE_1_0.txt or copy at
  5. // http://www.boost.org/LICENSE_1_0.txt)
  6. #if !defined(MQTT_BROKER_SHARED_TARGET_IMPL_HPP)
  7. #define MQTT_BROKER_SHARED_TARGET_IMPL_HPP
  8. #include <mqtt/broker/shared_target.hpp>
  9. #include <mqtt/broker/session_state.hpp>
  10. MQTT_BROKER_NS_BEGIN
  11. inline void shared_target::insert(buffer share_name, buffer topic_filter, session_state& ss) {
  12. std::lock_guard<mutex> g{mtx_targets_};
  13. auto& idx = targets_.get<tag_cid_sn>();
  14. auto it = idx.lower_bound(std::make_tuple(ss.client_id(), share_name));
  15. if (it == idx.end() || (it->share_name != share_name || it->client_id() != ss.client_id())) {
  16. it = idx.emplace_hint(it, force_move(share_name), ss, std::chrono::steady_clock::now());
  17. // const_cast is appropriate here
  18. // See https://github.com/boostorg/multi_index/issues/50
  19. auto& st = const_cast<entry&>(*it);
  20. bool inserted;
  21. std::tie(std::ignore, inserted) = st.topic_filters.insert(force_move(topic_filter));
  22. BOOST_ASSERT(inserted);
  23. }
  24. else {
  25. // entry exists
  26. // const_cast is appropriate here
  27. // See https://github.com/boostorg/multi_index/issues/50
  28. auto& st = const_cast<entry&>(*it);
  29. st.topic_filters.insert(force_move(topic_filter)); // ignore overwrite
  30. }
  31. }
  32. inline void shared_target::erase(buffer share_name, buffer topic_filter, session_state const& ss) {
  33. std::lock_guard<mutex> g{mtx_targets_};
  34. auto& idx = targets_.get<tag_cid_sn>();
  35. auto it = idx.find(std::make_tuple(ss.client_id(), share_name));
  36. if (it == idx.end()) {
  37. MQTT_LOG("mqtt_broker", warning)
  38. << "attempt to erase non exist entry"
  39. << " share_name:" << share_name
  40. << " topic_filtere:" << topic_filter
  41. << " client_id:" << ss.client_id();
  42. return;
  43. }
  44. // entry exists
  45. // const_cast is appropriate here
  46. // See https://github.com/boostorg/multi_index/issues/50
  47. auto& st = const_cast<entry&>(*it);
  48. st.topic_filters.erase(topic_filter);
  49. if (it->topic_filters.empty()) {
  50. idx.erase(it);
  51. }
  52. }
  53. inline void shared_target::erase(session_state const& ss) {
  54. std::lock_guard<mutex> g{mtx_targets_};
  55. auto& idx = targets_.get<tag_cid_sn>();
  56. auto r = idx.equal_range(ss.client_id());
  57. idx.erase(r.first, r.second);
  58. }
  59. inline optional<session_state_ref> shared_target::get_target(buffer const& share_name, buffer const& topic_filter) {
  60. std::lock_guard<mutex> g{mtx_targets_};
  61. // get share_name matched range ordered by timestamp (ascending)
  62. auto& idx = targets_.get<tag_sn_tp>();
  63. auto r = idx.equal_range(share_name);
  64. for (; r.first != r.second; ++r.first) {
  65. auto const& elem = *r.first;
  66. auto it = elem.topic_filters.find(topic_filter);
  67. // no share_name/topic_filter matched
  68. if (it == elem.topic_filters.end()) continue;
  69. // matched
  70. // update timestamp (timestamp is key)
  71. idx.modify(r.first, [](auto& e) { e.tp = std::chrono::steady_clock::now(); });
  72. return elem.ssr;
  73. }
  74. return nullopt;
  75. }
  76. inline shared_target::entry::entry(
  77. buffer share_name,
  78. session_state& ss,
  79. time_point_t tp)
  80. : share_name { force_move(share_name) },
  81. ssr { ss },
  82. tp { force_move(tp) }
  83. {}
  84. inline buffer const& shared_target::entry::client_id() const {
  85. return ssr.get().client_id();
  86. }
  87. MQTT_BROKER_NS_END
  88. #endif // MQTT_BROKER_SHARED_TARGET_IMPL_HPP