message.hpp 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123
  1. // Copyright Takatoshi Kondo 2018
  2. //
  3. // Distributed under the Boost Software License, Version 1.0.
  4. // (See accompanying file LICENSE_1_0.txt or copy at
  5. // http://www.boost.org/LICENSE_1_0.txt)
  6. #if !defined(MQTT_MESSAGE_HPP)
  7. #define MQTT_MESSAGE_HPP
  8. #include <string>
  9. #include <vector>
  10. #include <memory>
  11. #include <algorithm>
  12. #include <numeric>
  13. #include <boost/asio/buffer.hpp>
  14. #include <boost/container/static_vector.hpp>
  15. #include <boost/numeric/conversion/cast.hpp>
  16. #include <mqtt/namespace.hpp>
  17. #include <mqtt/two_byte_util.hpp>
  18. #include <mqtt/fixed_header.hpp>
  19. #include <mqtt/remaining_length.hpp>
  20. #include <mqtt/subscribe_options.hpp>
  21. #include <mqtt/const_buffer_util.hpp>
  22. #include <mqtt/will.hpp>
  23. #include <mqtt/connect_flags.hpp>
  24. #include <mqtt/publish.hpp>
  25. #include <mqtt/exception.hpp>
  26. #include <mqtt/utf8encoded_strings.hpp>
  27. #include <mqtt/four_byte_util.hpp>
  28. #include <mqtt/packet_id_type.hpp>
  29. #include <mqtt/optional.hpp>
  30. #include <mqtt/string_view.hpp>
  31. #include <mqtt/property.hpp>
  32. #include <mqtt/string_check.hpp>
  33. #include <mqtt/move.hpp>
  34. #include <mqtt/reason_code.hpp>
  35. #include <mqtt/connect_return_code.hpp>
  36. #include <mqtt/publish.hpp>
  37. namespace MQTT_NS {
  38. namespace as = boost::asio;
  39. inline namespace v3_1_1 {
  40. namespace detail_v3_1_1 {
  41. class header_only_message {
  42. public:
  43. /**
  44. * @brief Create empty header_packet_id_message.
  45. */
  46. header_only_message(control_packet_type type, std::uint8_t flags)
  47. : message_ { static_cast<char>(make_fixed_header(type, flags)), 0 }
  48. {}
  49. /**
  50. * @brief Create const buffer sequence
  51. * it is for boost asio APIs
  52. * @return const buffer sequence
  53. */
  54. std::vector<as::const_buffer> const_buffer_sequence() const {
  55. return { as::buffer(message_.data(), message_.size()) };
  56. }
  57. /**
  58. * @brief Get whole size of sequence
  59. * @return whole size
  60. */
  61. std::size_t size() const {
  62. return message_.size();
  63. }
  64. /**
  65. * @brief Get number of element of const_buffer_sequence
  66. * @return number of element of const_buffer_sequence
  67. */
  68. static constexpr std::size_t num_of_const_buffer_sequence() {
  69. return 1;
  70. }
  71. /**
  72. * @brief Create one continuours buffer.
  73. * All sequence of buffers are concatinated.
  74. * It is useful to store to file/database.
  75. * @return continuous buffer
  76. */
  77. std::string continuous_buffer() const {
  78. return std::string(message_.data(), size());
  79. }
  80. private:
  81. boost::container::static_vector<char, 2> message_;
  82. };
  83. template <std::size_t PacketIdBytes>
  84. class basic_header_packet_id_message;
  85. template <std::size_t PacketIdBytes>
  86. class basic_header_packet_id_message {
  87. public:
  88. /**
  89. * @brief Create empty header_packet_id_message.
  90. */
  91. basic_header_packet_id_message(control_packet_type type, std::uint8_t flags, typename packet_id_type<PacketIdBytes>::type packet_id)
  92. : message_ { static_cast<char>(make_fixed_header(type, flags)), PacketIdBytes }
  93. {
  94. add_packet_id_to_buf<PacketIdBytes>::apply(message_, packet_id);
  95. }
  96. template <typename Iterator>
  97. basic_header_packet_id_message(Iterator b, Iterator e) {
  98. if (std::distance(b, e) != 2 + PacketIdBytes) throw remaining_length_error();
  99. if (b[1] != PacketIdBytes) throw remaining_length_error();
  100. std::copy(b, e, std::back_inserter(message_));
  101. }
  102. /**
  103. * @brief Create const buffer sequence
  104. * it is for boost asio APIs
  105. * @return const buffer sequence
  106. */
  107. std::vector<as::const_buffer> const_buffer_sequence() const {
  108. return { as::buffer(message_.data(), size()) };
  109. }
  110. /**
  111. * @brief Get whole size of sequence
  112. * @return whole size
  113. */
  114. std::size_t size() const {
  115. return message_.size();
  116. }
  117. /**
  118. * @brief Get number of element of const_buffer_sequence
  119. * @return number of element of const_buffer_sequence
  120. */
  121. static constexpr std::size_t num_of_const_buffer_sequence() {
  122. return 1;
  123. }
  124. /**
  125. * @brief Create one continuours buffer.
  126. * All sequence of buffers are concatinated.
  127. * It is useful to store to file/database.
  128. * @return continuous buffer
  129. */
  130. std::string continuous_buffer() const {
  131. return std::string(message_.data(), size());
  132. }
  133. protected:
  134. boost::container::static_vector<char, 2 + PacketIdBytes> const& message() const {
  135. return message_;
  136. }
  137. private:
  138. boost::container::static_vector<char, 2 + PacketIdBytes> message_;
  139. };
  140. } // namespace detail_v3_1_1
  141. template <std::size_t PacketIdBytes>
  142. struct basic_puback_message : detail_v3_1_1::basic_header_packet_id_message<PacketIdBytes> {
  143. using base = detail_v3_1_1::basic_header_packet_id_message<PacketIdBytes>;
  144. basic_puback_message(typename packet_id_type<PacketIdBytes>::type packet_id)
  145. : base(control_packet_type::puback, 0b0000, packet_id)
  146. {}
  147. };
  148. using puback_message = basic_puback_message<2>;
  149. template <std::size_t PacketIdBytes>
  150. struct basic_pubrec_message : detail_v3_1_1::basic_header_packet_id_message<PacketIdBytes> {
  151. using base = detail_v3_1_1::basic_header_packet_id_message<PacketIdBytes>;
  152. basic_pubrec_message(typename packet_id_type<PacketIdBytes>::type packet_id)
  153. : base(control_packet_type::pubrec, 0b0000, packet_id)
  154. {}
  155. };
  156. using pubrec_message = basic_pubrec_message<2>;
  157. template <std::size_t PacketIdBytes>
  158. struct basic_pubrel_message : detail_v3_1_1::basic_header_packet_id_message<PacketIdBytes> {
  159. using base = detail_v3_1_1::basic_header_packet_id_message<PacketIdBytes>;
  160. basic_pubrel_message(typename packet_id_type<PacketIdBytes>::type packet_id)
  161. : base(control_packet_type::pubrel, 0b0010, packet_id)
  162. {
  163. }
  164. basic_pubrel_message(string_view buf)
  165. : base(buf.begin(), buf.end())
  166. {
  167. }
  168. /**
  169. * @brief Get packet id
  170. * @return packet_id
  171. */
  172. typename packet_id_type<PacketIdBytes>::type packet_id() const {
  173. return make_packet_id<PacketIdBytes>::apply(std::next(base::message().begin(), 2), base::message().end());
  174. }
  175. };
  176. using pubrel_message = basic_pubrel_message<2>;
  177. using pubrel_32_message = basic_pubrel_message<4>;
  178. template <std::size_t PacketIdBytes>
  179. struct basic_pubcomp_message : detail_v3_1_1::basic_header_packet_id_message<PacketIdBytes> {
  180. basic_pubcomp_message(typename packet_id_type<PacketIdBytes>::type packet_id)
  181. : detail_v3_1_1::basic_header_packet_id_message<PacketIdBytes>(control_packet_type::pubcomp, 0b0000, packet_id)
  182. {}
  183. };
  184. using pubcomp_message = basic_pubcomp_message<2>;
  185. template <std::size_t PacketIdBytes>
  186. struct basic_unsuback_message : detail_v3_1_1::basic_header_packet_id_message<PacketIdBytes> {
  187. basic_unsuback_message(typename packet_id_type<PacketIdBytes>::type packet_id)
  188. : detail_v3_1_1::basic_header_packet_id_message<PacketIdBytes>(control_packet_type::unsuback, 0b0000, packet_id)
  189. {}
  190. };
  191. using unsuback_message = basic_unsuback_message<2>;
  192. struct pingreq_message : detail_v3_1_1::header_only_message {
  193. pingreq_message()
  194. : detail_v3_1_1::header_only_message(control_packet_type::pingreq, 0b0000)
  195. {}
  196. };
  197. struct pingresp_message : detail_v3_1_1::header_only_message {
  198. pingresp_message()
  199. : detail_v3_1_1::header_only_message(control_packet_type::pingresp, 0b0000)
  200. {}
  201. };
  202. struct disconnect_message : detail_v3_1_1::header_only_message {
  203. disconnect_message()
  204. : detail_v3_1_1::header_only_message(control_packet_type::disconnect, 0b0000)
  205. {}
  206. };
  207. class connack_message {
  208. public:
  209. connack_message(bool session_present, connect_return_code return_code)
  210. : message_ {
  211. static_cast<char>(make_fixed_header(control_packet_type::connack, 0b0000)),
  212. 0b0010,
  213. static_cast<char>(session_present ? 1 : 0),
  214. static_cast<char>(return_code)
  215. }
  216. {}
  217. /**
  218. * @brief Create const buffer sequence
  219. * it is for boost asio APIs
  220. * @return const buffer sequence
  221. */
  222. std::vector<as::const_buffer> const_buffer_sequence() const {
  223. return { as::buffer(message_.data(), size()) };
  224. }
  225. /**
  226. * @brief Get whole size of sequence
  227. * @return whole size
  228. */
  229. std::size_t size() const {
  230. return message_.size();
  231. }
  232. /**
  233. * @brief Get number of element of const_buffer_sequence
  234. * @return number of element of const_buffer_sequence
  235. */
  236. static constexpr std::size_t num_of_const_buffer_sequence() {
  237. return 1;
  238. }
  239. /**
  240. * @brief Create one continuours buffer.
  241. * All sequence of buffers are concatinated.
  242. * It is useful to store to file/database.
  243. * @return continuous buffer
  244. */
  245. std::string continuous_buffer() const {
  246. return std::string(message_.data(), size());
  247. }
  248. private:
  249. boost::container::static_vector<char, 4> message_;
  250. };
  251. // variable length messages
  252. class connect_message {
  253. public:
  254. connect_message(
  255. std::uint16_t keep_alive_sec,
  256. buffer client_id,
  257. bool clean_session,
  258. optional<will> w,
  259. optional<buffer> user_name,
  260. optional<buffer> password
  261. )
  262. : fixed_header_(static_cast<char>(make_fixed_header(control_packet_type::connect, 0b0000))),
  263. connect_flags_(0),
  264. // protocol name length, protocol name, protocol level, connect flag, client id length, client id, keep alive
  265. remaining_length_(
  266. 2 + // protocol name length
  267. 4 + // protocol name
  268. 1 + // protocol level
  269. 1 + // connect flag
  270. 2 + // keep alive
  271. 2 + // client id length
  272. client_id.size() // client id
  273. ),
  274. protocol_name_and_level_ { 0x00, 0x04, 'M', 'Q', 'T', 'T', 0x04 },
  275. client_id_(force_move(client_id)),
  276. client_id_length_buf_{ num_to_2bytes(boost::numeric_cast<std::uint16_t>(client_id_.size())) },
  277. keep_alive_buf_ { num_to_2bytes(keep_alive_sec) }
  278. {
  279. utf8string_check(client_id_);
  280. if (clean_session) connect_flags_ |= connect_flags::clean_session;
  281. if (user_name) {
  282. utf8string_check(user_name.value());
  283. connect_flags_ |= connect_flags::user_name_flag;
  284. user_name_ = force_move(user_name.value());
  285. add_uint16_t_to_buf(user_name_length_buf_, boost::numeric_cast<std::uint16_t>(user_name_.size()));
  286. remaining_length_ += 2 + user_name_.size();
  287. }
  288. if (password) {
  289. connect_flags_ |= connect_flags::password_flag;
  290. password_ = force_move(password.value());
  291. add_uint16_t_to_buf(password_length_buf_, boost::numeric_cast<std::uint16_t>(password_.size()));
  292. remaining_length_ += 2 + password_.size();
  293. }
  294. if (w) {
  295. connect_flags_ |= connect_flags::will_flag;
  296. if (w.value().get_retain() == retain::yes) connect_flags_ |= connect_flags::will_retain;
  297. connect_flags::set_will_qos(connect_flags_, w.value().get_qos());
  298. utf8string_check(w.value().topic());
  299. will_topic_name_ = force_move(w.value().topic());
  300. add_uint16_t_to_buf(
  301. will_topic_name_length_buf_,
  302. boost::numeric_cast<std::uint16_t>(will_topic_name_.size())
  303. );
  304. if (w.value().message().size() > 0xffffL) throw will_message_length_error();
  305. will_message_ = force_move(w.value().message());
  306. add_uint16_t_to_buf(
  307. will_message_length_buf_,
  308. boost::numeric_cast<std::uint16_t>(will_message_.size()));
  309. remaining_length_ += 2 + will_topic_name_.size() + 2 + will_message_.size();
  310. }
  311. auto rb = remaining_bytes(remaining_length_);
  312. for (auto e : rb) {
  313. remaining_length_buf_.push_back(e);
  314. }
  315. }
  316. /**
  317. * @brief Create const buffer sequence
  318. * it is for boost asio APIs
  319. * @return const buffer sequence
  320. */
  321. std::vector<as::const_buffer> const_buffer_sequence() const {
  322. std::vector<as::const_buffer> ret;
  323. ret.reserve(num_of_const_buffer_sequence());
  324. ret.emplace_back(as::buffer(&fixed_header_, 1));
  325. ret.emplace_back(as::buffer(remaining_length_buf_.data(), remaining_length_buf_.size()));
  326. ret.emplace_back(as::buffer(protocol_name_and_level_.data(), protocol_name_and_level_.size()));
  327. ret.emplace_back(as::buffer(&connect_flags_, 1));
  328. ret.emplace_back(as::buffer(keep_alive_buf_.data(), keep_alive_buf_.size()));
  329. ret.emplace_back(as::buffer(client_id_length_buf_.data(), client_id_length_buf_.size()));
  330. ret.emplace_back(as::buffer(client_id_));
  331. if (connect_flags::has_will_flag(connect_flags_)) {
  332. ret.emplace_back(as::buffer(will_topic_name_length_buf_.data(), will_topic_name_length_buf_.size()));
  333. ret.emplace_back(as::buffer(will_topic_name_));
  334. ret.emplace_back(as::buffer(will_message_length_buf_.data(), will_message_length_buf_.size()));
  335. ret.emplace_back(as::buffer(will_message_));
  336. }
  337. if (connect_flags::has_user_name_flag(connect_flags_)) {
  338. ret.emplace_back(as::buffer(user_name_length_buf_.data(), user_name_length_buf_.size()));
  339. ret.emplace_back(as::buffer(user_name_));
  340. }
  341. if (connect_flags::has_password_flag(connect_flags_)) {
  342. ret.emplace_back(as::buffer(password_length_buf_.data(), password_length_buf_.size()));
  343. ret.emplace_back(as::buffer(password_));
  344. }
  345. return ret;
  346. }
  347. /**
  348. * @brief Get whole size of sequence
  349. * @return whole size
  350. */
  351. std::size_t size() const {
  352. return
  353. 1 + // fixed header
  354. remaining_length_buf_.size() +
  355. remaining_length_;
  356. }
  357. /**
  358. * @brief Get number of element of const_buffer_sequence
  359. * @return number of element of const_buffer_sequence
  360. */
  361. static constexpr std::size_t num_of_const_buffer_sequence() {
  362. return
  363. 1 + // fixed header
  364. 1 + // remaining length
  365. 1 + // protocol name and level
  366. 1 + // connect flags
  367. 1 + // keep alive
  368. 2 + // client id length, client id
  369. 2 + // will topic name length, will topic name
  370. 2 + // will message length, will message
  371. 2 + // user name length, user name
  372. 2; // password length, password
  373. }
  374. /**
  375. * @brief Create one continuours buffer.
  376. * All sequence of buffers are concatinated.
  377. * It is useful to store to file/database.
  378. * @return continuous buffer
  379. */
  380. std::string continuous_buffer() const {
  381. std::string ret;
  382. ret.reserve(size());
  383. ret.push_back(static_cast<char>(fixed_header_));
  384. ret.append(remaining_length_buf_.data(), remaining_length_buf_.size());
  385. ret.append(protocol_name_and_level_.data(), protocol_name_and_level_.size());
  386. ret.push_back(connect_flags_);
  387. ret.append(keep_alive_buf_.data(), keep_alive_buf_.size());
  388. ret.append(client_id_length_buf_.data(), client_id_length_buf_.size());
  389. ret.append(client_id_.data(), client_id_.size());
  390. if (connect_flags::has_will_flag(connect_flags_)) {
  391. ret.append(will_topic_name_length_buf_.data(), will_topic_name_length_buf_.size());
  392. ret.append(will_topic_name_.data(), will_topic_name_.size());
  393. ret.append(will_message_length_buf_.data(), will_message_length_buf_.size());
  394. ret.append(will_message_.data(), will_message_.size());
  395. }
  396. if (connect_flags::has_user_name_flag(connect_flags_)) {
  397. ret.append(user_name_length_buf_.data(), user_name_length_buf_.size());
  398. ret.append(user_name_.data(), user_name_.size());
  399. }
  400. if (connect_flags::has_password_flag(connect_flags_)) {
  401. ret.append(password_length_buf_.data(), password_length_buf_.size());
  402. ret.append(password_.data(), password_.size());
  403. }
  404. return ret;
  405. }
  406. private:
  407. std::uint8_t fixed_header_;
  408. char connect_flags_;
  409. std::size_t remaining_length_;
  410. boost::container::static_vector<char, 4> remaining_length_buf_;
  411. boost::container::static_vector<char, 7> protocol_name_and_level_;
  412. buffer client_id_;
  413. boost::container::static_vector<char, 2> client_id_length_buf_;
  414. buffer will_topic_name_;
  415. boost::container::static_vector<char, 2> will_topic_name_length_buf_;
  416. buffer will_message_;
  417. boost::container::static_vector<char, 2> will_message_length_buf_;
  418. buffer user_name_;
  419. boost::container::static_vector<char, 2> user_name_length_buf_;
  420. buffer password_;
  421. boost::container::static_vector<char, 2> password_length_buf_;
  422. boost::container::static_vector<char, 2> keep_alive_buf_;
  423. };
  424. template <std::size_t PacketIdBytes>
  425. class basic_publish_message {
  426. public:
  427. template <
  428. typename ConstBufferSequence,
  429. typename std::enable_if<
  430. as::is_const_buffer_sequence<ConstBufferSequence>::value,
  431. std::nullptr_t
  432. >::type = nullptr
  433. >
  434. basic_publish_message(
  435. typename packet_id_type<PacketIdBytes>::type packet_id,
  436. as::const_buffer topic_name,
  437. ConstBufferSequence payloads,
  438. publish_options pubopts
  439. )
  440. : fixed_header_(make_fixed_header(control_packet_type::publish, 0b0000) | pubopts.operator std::uint8_t()),
  441. topic_name_(topic_name),
  442. topic_name_length_buf_ { num_to_2bytes(boost::numeric_cast<std::uint16_t>(topic_name.size())) },
  443. remaining_length_(
  444. 2 // topic name length
  445. + topic_name_.size() // topic name
  446. + ( (pubopts.get_qos() == qos::at_least_once || pubopts.get_qos() == qos::exactly_once)
  447. ? PacketIdBytes // packet_id
  448. : 0)
  449. )
  450. {
  451. auto b = as::buffer_sequence_begin(payloads);
  452. auto e = as::buffer_sequence_end(payloads);
  453. auto num_of_payloads = static_cast<std::size_t>(std::distance(b, e));
  454. payloads_.reserve(num_of_payloads);
  455. for (; b != e; ++b) {
  456. auto const& payload = *b;
  457. remaining_length_ += payload.size();
  458. payloads_.push_back(payload);
  459. }
  460. utf8string_check(topic_name_);
  461. auto rb = remaining_bytes(remaining_length_);
  462. for (auto e : rb) {
  463. remaining_length_buf_.push_back(e);
  464. }
  465. if (pubopts.get_qos() == qos::at_least_once ||
  466. pubopts.get_qos() == qos::exactly_once) {
  467. packet_id_.reserve(PacketIdBytes);
  468. add_packet_id_to_buf<PacketIdBytes>::apply(packet_id_, packet_id);
  469. }
  470. }
  471. // Used in test code, and to deserialize stored messages.
  472. basic_publish_message(buffer buf) {
  473. if (buf.empty()) throw remaining_length_error();
  474. fixed_header_ = static_cast<std::uint8_t>(buf.front());
  475. qos qos_value = get_qos();
  476. buf.remove_prefix(1);
  477. if (buf.empty()) throw remaining_length_error();
  478. auto len_consumed = remaining_length(buf.begin(), buf.end());
  479. remaining_length_ = std::get<0>(len_consumed);
  480. auto consumed = std::get<1>(len_consumed);
  481. std::copy(
  482. buf.begin(),
  483. std::next(buf.begin(), static_cast<string_view::difference_type>(consumed)),
  484. std::back_inserter(remaining_length_buf_));
  485. buf.remove_prefix(consumed);
  486. if (buf.size() < 2) throw remaining_length_error();
  487. std::copy(buf.begin(), std::next(buf.begin(), 2), std::back_inserter(topic_name_length_buf_));
  488. auto topic_name_length = make_uint16_t(topic_name_length_buf_.begin(), topic_name_length_buf_.end());
  489. buf.remove_prefix(2);
  490. if (buf.size() < topic_name_length) throw remaining_length_error();
  491. topic_name_ = as::buffer(buf.substr(0, topic_name_length));
  492. utf8string_check(topic_name_);
  493. buf.remove_prefix(topic_name_length);
  494. switch (qos_value) {
  495. case qos::at_most_once:
  496. break;
  497. case qos::at_least_once:
  498. case qos::exactly_once:
  499. if (buf.size() < PacketIdBytes) throw remaining_length_error();
  500. std::copy(buf.begin(), std::next(buf.begin(), PacketIdBytes), std::back_inserter(packet_id_));
  501. buf.remove_prefix(PacketIdBytes);
  502. break;
  503. default:
  504. throw protocol_error();
  505. break;
  506. };
  507. if (!buf.empty()) {
  508. payloads_.emplace_back(as::buffer(buf));
  509. }
  510. }
  511. /**
  512. * @brief Create const buffer sequence
  513. * it is for boost asio APIs
  514. * @return const buffer sequence
  515. */
  516. std::vector<as::const_buffer> const_buffer_sequence() const {
  517. std::vector<as::const_buffer> ret;
  518. ret.reserve(num_of_const_buffer_sequence());
  519. ret.emplace_back(as::buffer(&fixed_header_, 1));
  520. ret.emplace_back(as::buffer(remaining_length_buf_.data(), remaining_length_buf_.size()));
  521. ret.emplace_back(as::buffer(topic_name_length_buf_.data(), topic_name_length_buf_.size()));
  522. ret.emplace_back(as::buffer(topic_name_));
  523. if (!packet_id_.empty()) {
  524. ret.emplace_back(as::buffer(packet_id_.data(), packet_id_.size()));
  525. }
  526. std::copy(payloads_.begin(), payloads_.end(), std::back_inserter(ret));
  527. return ret;
  528. }
  529. /**
  530. * @brief Get whole size of sequence
  531. * @return whole size
  532. */
  533. std::size_t size() const {
  534. return
  535. 1 + // fixed header
  536. remaining_length_buf_.size() +
  537. remaining_length_;
  538. }
  539. /**
  540. * @brief Get number of element of const_buffer_sequence
  541. * @return number of element of const_buffer_sequence
  542. */
  543. std::size_t num_of_const_buffer_sequence() const {
  544. return
  545. 1 + // fixed header
  546. 1 + // remaining length
  547. 2 + // topic name length, topic name
  548. (packet_id_.empty() ? 0 : 1) + // packet_id
  549. payloads_.size();
  550. }
  551. /**
  552. * @brief Create one continuous buffer.
  553. * All sequence of buffers are concatinated.
  554. * It is useful to store to file/database.
  555. * @return continuous buffer
  556. */
  557. std::string continuous_buffer() const {
  558. std::string ret;
  559. ret.reserve(size());
  560. ret.push_back(static_cast<char>(fixed_header_));
  561. ret.append(remaining_length_buf_.data(), remaining_length_buf_.size());
  562. ret.append(topic_name_length_buf_.data(), topic_name_length_buf_.size());
  563. ret.append(get_pointer(topic_name_), get_size(topic_name_));
  564. ret.append(packet_id_.data(), packet_id_.size());
  565. for (auto const& payload : payloads_) {
  566. ret.append(get_pointer(payload), get_size(payload));
  567. }
  568. return ret;
  569. }
  570. /**
  571. * @brief Get packet id
  572. * @return packet_id
  573. */
  574. typename packet_id_type<PacketIdBytes>::type packet_id() const {
  575. return make_packet_id<PacketIdBytes>::apply(packet_id_.begin(), packet_id_.end());
  576. }
  577. /**
  578. * @brief Get publish_options
  579. * @return publish_options.
  580. */
  581. constexpr publish_options get_options() const {
  582. return publish_options(fixed_header_);
  583. }
  584. /**
  585. * @brief Get qos
  586. * @return qos
  587. */
  588. constexpr qos get_qos() const {
  589. return publish::get_qos(fixed_header_);
  590. }
  591. /**
  592. * @brief Check retain flag
  593. * @return true if retain, otherwise return false.
  594. */
  595. constexpr bool is_retain() const {
  596. return publish::is_retain(fixed_header_);
  597. }
  598. /**
  599. * @brief Check dup flag
  600. * @return true if dup, otherwise return false.
  601. */
  602. constexpr bool is_dup() const {
  603. return publish::is_dup(fixed_header_);
  604. }
  605. /**
  606. * @brief Get topic name
  607. * @return topic name
  608. */
  609. constexpr string_view topic() const {
  610. return string_view(get_pointer(topic_name_), get_size(topic_name_));
  611. }
  612. /**
  613. * @brief Get payload
  614. * @return payload
  615. */
  616. std::vector<string_view> payload() const {
  617. std::vector<string_view> ret;
  618. ret.reserve(payloads_.size());
  619. for (auto const& payload : payloads_) {
  620. ret.emplace_back(get_pointer(payload), get_size(payload));
  621. }
  622. return ret;
  623. }
  624. /**
  625. * @brief Get payload as single buffer
  626. * @return payload
  627. */
  628. buffer payload_as_buffer() const {
  629. auto size = std::accumulate(
  630. payloads_.begin(),
  631. payloads_.end(),
  632. std::size_t(0),
  633. [](std::size_t s, as::const_buffer const& payload) {
  634. return s += payload.size();
  635. }
  636. );
  637. if (size == 0) return buffer();
  638. auto spa = make_shared_ptr_array(size);
  639. auto ptr = spa.get();
  640. auto it = ptr;
  641. for (auto const& payload : payloads_) {
  642. auto b = get_pointer(payload);
  643. auto s = get_size(payload);
  644. auto e = b + s;
  645. std::copy(b, e, it);
  646. it += s;
  647. }
  648. return buffer(string_view(ptr, size), force_move(spa));
  649. }
  650. /**
  651. * @brief Set dup flag
  652. * @param dup flag value to set
  653. */
  654. constexpr void set_dup(bool dup) {
  655. publish::set_dup(fixed_header_, dup);
  656. }
  657. private:
  658. std::uint8_t fixed_header_;
  659. as::const_buffer topic_name_;
  660. boost::container::static_vector<char, 2> topic_name_length_buf_;
  661. boost::container::static_vector<char, PacketIdBytes> packet_id_;
  662. std::vector<as::const_buffer> payloads_;
  663. std::size_t remaining_length_;
  664. boost::container::static_vector<char, 4> remaining_length_buf_;
  665. };
  666. using publish_message = basic_publish_message<2>;
  667. using publish_32_message = basic_publish_message<4>;
  668. template <std::size_t PacketIdBytes>
  669. class basic_subscribe_message {
  670. private:
  671. struct entry {
  672. entry(as::const_buffer topic_name, subscribe_options qos_value)
  673. : topic_name_(topic_name),
  674. topic_name_length_buf_ { num_to_2bytes(boost::numeric_cast<std::uint16_t>(topic_name_.size())) },
  675. qos_(qos_value.get_qos())
  676. {}
  677. as::const_buffer topic_name_;
  678. boost::container::static_vector<char, 2> topic_name_length_buf_;
  679. qos qos_;
  680. };
  681. public:
  682. basic_subscribe_message(
  683. std::vector<std::tuple<as::const_buffer, subscribe_options>> params,
  684. typename packet_id_type<PacketIdBytes>::type packet_id
  685. )
  686. : fixed_header_(make_fixed_header(control_packet_type::subscribe, 0b0010)),
  687. remaining_length_(PacketIdBytes)
  688. {
  689. add_packet_id_to_buf<PacketIdBytes>::apply(packet_id_, packet_id);
  690. // Check for errors before allocating.
  691. for (auto&& e : params) {
  692. as::const_buffer topic_name = std::get<0>(e);
  693. utf8string_check(topic_name);
  694. }
  695. entries_.reserve(params.size());
  696. for (auto&& e : params) {
  697. as::const_buffer topic_name = std::get<0>(e);
  698. size_t size = topic_name.size();
  699. entries_.emplace_back(topic_name, std::get<1>(e));
  700. remaining_length_ +=
  701. 2 + // topic name length
  702. size + // topic name
  703. 1; // means QoS
  704. }
  705. auto rb = remaining_bytes(remaining_length_);
  706. for (auto e : rb) {
  707. remaining_length_buf_.push_back(e);
  708. }
  709. }
  710. /**
  711. * @brief Create const buffer sequence
  712. * it is for boost asio APIs
  713. * @return const buffer sequence
  714. */
  715. std::vector<as::const_buffer> const_buffer_sequence() const {
  716. std::vector<as::const_buffer> ret;
  717. ret.reserve(num_of_const_buffer_sequence());
  718. ret.emplace_back(as::buffer(&fixed_header_, 1));
  719. ret.emplace_back(as::buffer(remaining_length_buf_.data(), remaining_length_buf_.size()));
  720. ret.emplace_back(as::buffer(packet_id_.data(), packet_id_.size()));
  721. for (auto const& e : entries_) {
  722. ret.emplace_back(as::buffer(e.topic_name_length_buf_.data(), e.topic_name_length_buf_.size()));
  723. ret.emplace_back(as::buffer(e.topic_name_));
  724. ret.emplace_back(as::buffer(&e.qos_, 1));
  725. }
  726. return ret;
  727. }
  728. /**
  729. * @brief Get whole size of sequence
  730. * @return whole size
  731. */
  732. std::size_t size() const {
  733. return
  734. 1 + // fixed header
  735. remaining_length_buf_.size() +
  736. remaining_length_;
  737. }
  738. /**
  739. * @brief Get number of element of const_buffer_sequence
  740. * @return number of element of const_buffer_sequence
  741. */
  742. std::size_t num_of_const_buffer_sequence() const {
  743. return
  744. 1 + // fixed header
  745. 1 + // remaining length
  746. 1 + // packet id
  747. entries_.size() * 3; // topic name length, topic name, qos
  748. }
  749. /**
  750. * @brief Create one continuours buffer.
  751. * All sequence of buffers are concatinated.
  752. * It is useful to store to file/database.
  753. * @return continuous buffer
  754. */
  755. std::string continuous_buffer() const {
  756. std::string ret;
  757. ret.reserve(size());
  758. ret.push_back(static_cast<char>(fixed_header_));
  759. ret.append(remaining_length_buf_.data(), remaining_length_buf_.size());
  760. ret.append(packet_id_.data(), packet_id_.size());
  761. for (auto const& e : entries_) {
  762. ret.append(e.topic_name_length_buf_.data(), e.topic_name_length_buf_.size());
  763. ret.append(get_pointer(e.topic_name_), get_size(e.topic_name_));
  764. ret.push_back(static_cast<char>(e.qos_));
  765. }
  766. return ret;
  767. }
  768. private:
  769. std::uint8_t fixed_header_;
  770. std::vector<entry> entries_;
  771. boost::container::static_vector<char, PacketIdBytes> packet_id_;
  772. std::size_t remaining_length_;
  773. boost::container::static_vector<char, 4> remaining_length_buf_;
  774. };
  775. using subscribe_message = basic_subscribe_message<2>;
  776. template <std::size_t PacketIdBytes>
  777. class basic_suback_message {
  778. public:
  779. basic_suback_message(
  780. std::vector<suback_return_code> params,
  781. typename packet_id_type<PacketIdBytes>::type packet_id
  782. )
  783. : fixed_header_(make_fixed_header(control_packet_type::suback, 0b0000)),
  784. remaining_length_(params.size() + PacketIdBytes)
  785. {
  786. add_packet_id_to_buf<PacketIdBytes>::apply(packet_id_, packet_id);
  787. auto rb = remaining_bytes(remaining_length_);
  788. for (auto e : rb) {
  789. remaining_length_buf_.push_back(e);
  790. }
  791. // TODO: We should be able to simply static-cast params.data() into a char*.
  792. entries_.reserve(params.size());
  793. for (auto e : params) {
  794. entries_.push_back(static_cast<char>(e));
  795. }
  796. }
  797. /**
  798. * @brief Create const buffer sequence
  799. * it is for boost asio APIs
  800. * @return const buffer sequence
  801. */
  802. std::vector<as::const_buffer> const_buffer_sequence() const {
  803. std::vector<as::const_buffer> ret;
  804. ret.reserve(num_of_const_buffer_sequence());
  805. ret.emplace_back(as::buffer(&fixed_header_, 1));
  806. ret.emplace_back(as::buffer(remaining_length_buf_.data(), remaining_length_buf_.size()));
  807. ret.emplace_back(as::buffer(packet_id_.data(), packet_id_.size()));
  808. ret.emplace_back(as::buffer(entries_));
  809. return ret;
  810. }
  811. /**
  812. * @brief Get whole size of sequence
  813. * @return whole size
  814. */
  815. std::size_t size() const {
  816. return
  817. 1 + // fixed header
  818. remaining_length_buf_.size() +
  819. remaining_length_;
  820. }
  821. /**
  822. * @brief Get number of element of const_buffer_sequence
  823. * @return number of element of const_buffer_sequence
  824. */
  825. static constexpr std::size_t num_of_const_buffer_sequence() {
  826. return 4; // fixed header, remaining length, packet_id, entries
  827. }
  828. /**
  829. * @brief Create one continuours buffer.
  830. * All sequence of buffers are concatinated.
  831. * It is useful to store to file/database.
  832. * @return continuous buffer
  833. */
  834. std::string continuous_buffer() const {
  835. std::string ret;
  836. ret.reserve(size());
  837. ret.push_back(static_cast<char>(fixed_header_));
  838. ret.append(remaining_length_buf_.data(), remaining_length_buf_.size());
  839. ret.append(packet_id_.data(), packet_id_.size());
  840. ret.append(entries_);
  841. return ret;
  842. }
  843. private:
  844. std::uint8_t fixed_header_;
  845. std::string entries_;
  846. boost::container::static_vector<char, PacketIdBytes> packet_id_;
  847. std::size_t remaining_length_;
  848. boost::container::static_vector<char, 4> remaining_length_buf_;
  849. };
  850. using suback_message = basic_suback_message<2>;
  851. template <std::size_t PacketIdBytes>
  852. class basic_unsubscribe_message {
  853. private:
  854. struct entry {
  855. entry(as::const_buffer topic_name)
  856. : topic_name_(force_move(topic_name)),
  857. topic_name_length_buf_ { num_to_2bytes(boost::numeric_cast<std::uint16_t>(topic_name_.size())) }
  858. {}
  859. as::const_buffer topic_name_;
  860. boost::container::static_vector<char, 2> topic_name_length_buf_;
  861. };
  862. public:
  863. basic_unsubscribe_message(
  864. std::vector<as::const_buffer> params,
  865. typename packet_id_type<PacketIdBytes>::type packet_id
  866. )
  867. : fixed_header_(make_fixed_header(control_packet_type::unsubscribe, 0b0010)),
  868. remaining_length_(PacketIdBytes)
  869. {
  870. add_packet_id_to_buf<PacketIdBytes>::apply(packet_id_, packet_id);
  871. // Check for errors before allocating.
  872. for (auto&& e : params) {
  873. utf8string_check(e);
  874. }
  875. entries_.reserve(params.size());
  876. for (auto&& e : params) {
  877. entries_.emplace_back(e);
  878. remaining_length_ +=
  879. 2 + // topic name length
  880. e.size(); // topic name
  881. }
  882. auto rb = remaining_bytes(remaining_length_);
  883. for (auto e : rb) {
  884. remaining_length_buf_.push_back(e);
  885. }
  886. }
  887. /**
  888. * @brief Create const buffer sequence
  889. * it is for boost asio APIs
  890. * @return const buffer sequence
  891. */
  892. std::vector<as::const_buffer> const_buffer_sequence() const {
  893. std::vector<as::const_buffer> ret;
  894. ret.reserve(num_of_const_buffer_sequence());
  895. ret.emplace_back(as::buffer(&fixed_header_, 1));
  896. ret.emplace_back(as::buffer(remaining_length_buf_.data(), remaining_length_buf_.size()));
  897. ret.emplace_back(as::buffer(packet_id_.data(), packet_id_.size()));
  898. for (auto const& e : entries_) {
  899. ret.emplace_back(as::buffer(e.topic_name_length_buf_.data(), e.topic_name_length_buf_.size()));
  900. ret.emplace_back(as::buffer(e.topic_name_));
  901. }
  902. return ret;
  903. }
  904. /**
  905. * @brief Get whole size of sequence
  906. * @return whole size
  907. */
  908. std::size_t size() const {
  909. return
  910. 1 + // fixed header
  911. remaining_length_buf_.size() +
  912. remaining_length_;
  913. }
  914. /**
  915. * @brief Get number of element of const_buffer_sequence
  916. * @return number of element of const_buffer_sequence
  917. */
  918. std::size_t num_of_const_buffer_sequence() const {
  919. return
  920. 1 + // fixed header
  921. 1 + // remaining length
  922. 1 + // packet id
  923. entries_.size() * 2; // topic name length, topic name
  924. }
  925. /**
  926. * @brief Create one continuours buffer.
  927. * All sequence of buffers are concatinated.
  928. * It is useful to store to file/database.
  929. * @return continuous buffer
  930. */
  931. std::string continuous_buffer() const {
  932. std::string ret;
  933. ret.reserve(size());
  934. ret.push_back(static_cast<char>(fixed_header_));
  935. ret.append(remaining_length_buf_.data(), remaining_length_buf_.size());
  936. ret.append(packet_id_.data(), packet_id_.size());
  937. for (auto const& e : entries_) {
  938. ret.append(e.topic_name_length_buf_.data(), e.topic_name_length_buf_.size());
  939. ret.append(get_pointer(e.topic_name_), get_size(e.topic_name_));
  940. }
  941. return ret;
  942. }
  943. private:
  944. std::uint8_t fixed_header_;
  945. std::vector<entry> entries_;
  946. boost::container::static_vector<char, PacketIdBytes> packet_id_;
  947. std::size_t remaining_length_;
  948. boost::container::static_vector<char, 4> remaining_length_buf_;
  949. };
  950. using unsubscribe_message = basic_unsubscribe_message<2>;
  951. } // inline namespace v3_1_1
  952. } // namespace MQTT_NS
  953. #endif // MQTT_MESSAGE_HPP