broker.hpp 82 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194
  1. // Copyright Takatoshi Kondo 2017
  2. //
  3. // Distributed under the Boost Software License, Version 1.0.
  4. // (See accompanying file LICENSE_1_0.txt or copy at
  5. // http://www.boost.org/LICENSE_1_0.txt)
  6. #if !defined(MQTT_BROKER_BROKER_HPP)
  7. #define MQTT_BROKER_BROKER_HPP
  8. #include <mqtt/config.hpp>
  9. #include <set>
  10. #include <boost/lexical_cast.hpp>
  11. #include <mqtt/broker/broker_namespace.hpp>
  12. #include <mqtt/optional.hpp>
  13. #include <mqtt/property.hpp>
  14. #include <mqtt/visitor_util.hpp>
  15. #include <mqtt/broker/session_state.hpp>
  16. #include <mqtt/broker/sub_con_map.hpp>
  17. #include <mqtt/broker/retained_messages.hpp>
  18. #include <mqtt/broker/retained_topic_map.hpp>
  19. #include <mqtt/broker/shared_target_impl.hpp>
  20. #include <mqtt/broker/mutex.hpp>
  21. #include <mqtt/broker/uuid.hpp>
  22. #include <mqtt/broker/constant.hpp>
  23. #include <mqtt/broker/security.hpp>
  24. MQTT_BROKER_NS_BEGIN
  25. namespace mi = boost::multi_index;
  26. namespace as = boost::asio;
  27. class broker_t {
  28. public:
  29. broker_t(as::io_context& timer_ioc)
  30. :timer_ioc_(timer_ioc),
  31. tim_disconnect_(timer_ioc_) {
  32. security.default_config();
  33. }
  34. // [begin] for test setting
  35. /**
  36. * @brief set_disconnect_delay adds a delay to disconnect operations.
  37. *
  38. * This makes the broker wait the specified amount between when a disconnect
  39. * is received from a client, and when the connection is actually closed in
  40. * the broker.
  41. *
  42. * @param delay - the amount to delay by
  43. */
  44. void set_disconnect_delay(std::chrono::steady_clock::duration delay) {
  45. delay_disconnect_ = force_move(delay);
  46. }
  47. /**
  48. * @brief set pingresp send operaton
  49. *
  50. * @param b - if true, send pingresp when pingreq is received.
  51. * if false, doesn't send pingresp for test.
  52. */
  53. void set_pingresp(bool b) {
  54. pingresp_ = b;
  55. }
  56. /**
  57. * @brief set pingresp send operaton
  58. *
  59. * @param b - if true, send connack when connect is received.
  60. * if false, doesn't send connack for test.
  61. */
  62. void set_connack(bool b) {
  63. connack_ = b;
  64. }
  65. /**
  66. * @brief configure the security settings
  67. */
  68. void set_security(broker::security&& security) {
  69. this->security = force_move(security);
  70. }
  71. // [end] for test setting
  72. /**
  73. * @brief handle_accept
  74. *
  75. * Call this function when an server (of whatever kind) has accepted a raw
  76. * connection from an MQTT client. By 'raw connection', this might be raw TCP sockets
  77. * or websockets, or completed a TLS handshake, or any other underlying transport
  78. * type, but what is not meant is that the mqtt client on the other end of the endpoint
  79. * has initiated the MQTT application protocol connection sequence with CONNECT or CONACK
  80. * messages being sent or received.
  81. *
  82. * This function will assign several event handlers into server (of whatever kind)
  83. * that is provided as a parameter. This includes connection handlers, disconnection handlers
  84. * and various handlers for a variety of of MQTT message types.
  85. *
  86. * @param ep - The server (of whichever kind) to accept a connection on.
  87. */
  88. void handle_accept(con_sp_t spep) {
  89. con_wp_t wp(spep);
  90. endpoint_t& ep = *spep;
  91. ep.socket().lowest_layer().set_option(as::ip::tcp::no_delay(true));
  92. ep.set_auto_pub_response(false);
  93. ep.set_async_operation(true);
  94. ep.set_topic_alias_maximum(MQTT_NS::topic_alias_max);
  95. // set connection (lower than MQTT) level handlers
  96. ep.set_close_handler(
  97. [this, wp]
  98. (){
  99. con_sp_t sp = wp.lock();
  100. BOOST_ASSERT(sp);
  101. close_proc(force_move(sp), true);
  102. });
  103. ep.set_error_handler(
  104. [this, wp]
  105. (error_code ec){
  106. con_sp_t sp = wp.lock();
  107. BOOST_ASSERT(sp);
  108. auto ver = sp->get_protocol_version();
  109. MQTT_LOG("mqtt_broker", info)
  110. << MQTT_ADD_VALUE(address, this)
  111. << " error_handler is called. ec:" << ec.message() << " protocol_version:" << ver;
  112. auto send_response =
  113. [&](auto ec) {
  114. if (sp->connected()) {
  115. auto rc =
  116. [&] () -> MQTT_NS::optional<v5::disconnect_reason_code> {
  117. if (ec == boost::system::errc::protocol_error) {
  118. return MQTT_NS::v5::disconnect_reason_code::protocol_error;
  119. }
  120. else if (ec == boost::system::errc::bad_message) {
  121. return MQTT_NS::v5::disconnect_reason_code::malformed_packet;
  122. }
  123. return MQTT_NS::nullopt;
  124. }();
  125. if (rc) {
  126. MQTT_LOG("mqtt_broker", trace)
  127. << MQTT_ADD_VALUE(address, this)
  128. << "send DISCONNECT reason_code:" << rc.value();
  129. sp->async_disconnect(
  130. rc.value(),
  131. v5::properties{},
  132. [sp]
  133. (error_code ec) {
  134. if (ec) {
  135. MQTT_LOG("mqtt_broker", info)
  136. << MQTT_ADD_VALUE(address, sp.get())
  137. << ec.message();
  138. }
  139. }
  140. );
  141. }
  142. }
  143. else if (sp->underlying_connected()){
  144. // underlying layer connected, mqtt connecting
  145. // and protocol_version has already been determind as v5
  146. auto rc =
  147. [&] () -> MQTT_NS::optional<v5::connect_reason_code> {
  148. if (ec ==boost::system::errc::protocol_error) {
  149. return MQTT_NS::v5::connect_reason_code::protocol_error;
  150. }
  151. else if (ec == boost::system::errc::bad_message) {
  152. return MQTT_NS::v5::connect_reason_code::malformed_packet;
  153. }
  154. return MQTT_NS::nullopt;
  155. }();
  156. if (rc) {
  157. MQTT_LOG("mqtt_broker", trace)
  158. << MQTT_ADD_VALUE(address, this)
  159. << "send CONNACK reason_code:" << rc.value();
  160. if (connack_) sp->async_connack(
  161. false,
  162. rc.value(),
  163. [sp]
  164. (error_code ec) {
  165. if (ec) {
  166. MQTT_LOG("mqtt_broker", info)
  167. << MQTT_ADD_VALUE(address, sp.get())
  168. << ec.message();
  169. }
  170. }
  171. );
  172. }
  173. }
  174. };
  175. switch (ver) {
  176. case MQTT_NS::protocol_version::v5:
  177. // https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#S4_13_Errors
  178. // https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901205
  179. //
  180. // The DISCONNECT packet is the final MQTT Control Packet sent from the Client or
  181. // the Server.
  182. send_response(ec);
  183. break;
  184. case MQTT_NS::protocol_version::v3_1_1:
  185. // DISCONNECT can't be sent by broker on v3.1.1
  186. //
  187. // http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718090
  188. //
  189. // The DISCONNECT Packet is the final Control Packet sent from the Client to the Server.
  190. // It indicates that the Client is disconnecting cleanly.
  191. //
  192. // At the MQTT connecting, there is no appropriate Connect Return Code on v3.1.1
  193. // http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718035
  194. break;
  195. default:
  196. // The protocol_version is in the CONNECT packet.
  197. // Protocol error could happen before the protocol_version is parsed.
  198. break;
  199. }
  200. close_proc(force_move(sp), true);
  201. }
  202. );
  203. // set MQTT level handlers
  204. ep.set_connect_handler(
  205. [this, wp]
  206. (buffer client_id,
  207. optional<buffer> username,
  208. optional<buffer> password,
  209. optional<will> will,
  210. bool clean_session,
  211. std::uint16_t keep_alive) {
  212. con_sp_t sp = wp.lock();
  213. BOOST_ASSERT(sp);
  214. auto p = sp.get();
  215. try {
  216. return connect_handler(
  217. force_move(sp),
  218. force_move(client_id),
  219. force_move(username),
  220. force_move(password),
  221. force_move(will),
  222. clean_session,
  223. keep_alive,
  224. v5::properties{}
  225. );
  226. }
  227. catch (std::exception const& ex) {
  228. MQTT_LOG("mqtt_broker", error)
  229. << MQTT_ADD_VALUE(address, p)
  230. << ex.what();
  231. return true;
  232. }
  233. }
  234. );
  235. ep.set_v5_connect_handler(
  236. [this, wp]
  237. (buffer client_id,
  238. optional<buffer> username,
  239. optional<buffer> password,
  240. optional<will> will,
  241. bool clean_start,
  242. std::uint16_t keep_alive,
  243. v5::properties props) {
  244. con_sp_t sp = wp.lock();
  245. BOOST_ASSERT(sp);
  246. auto p = sp.get();
  247. try {
  248. return connect_handler(
  249. force_move(sp),
  250. force_move(client_id),
  251. force_move(username),
  252. force_move(password),
  253. force_move(will),
  254. clean_start,
  255. keep_alive,
  256. force_move(props)
  257. );
  258. }
  259. catch (std::exception const& ex) {
  260. MQTT_LOG("mqtt_broker", error)
  261. << MQTT_ADD_VALUE(address, p)
  262. << ex.what();
  263. return true;
  264. }
  265. }
  266. );
  267. ep.set_disconnect_handler(
  268. [this, wp]
  269. (){
  270. con_sp_t sp = wp.lock();
  271. BOOST_ASSERT(sp);
  272. auto p = sp.get();
  273. try {
  274. disconnect_handler(force_move(sp));
  275. }
  276. catch (std::exception const& ex) {
  277. MQTT_LOG("mqtt_broker", error)
  278. << MQTT_ADD_VALUE(address, p)
  279. << ex.what();
  280. }
  281. }
  282. );
  283. ep.set_v5_disconnect_handler(
  284. [this, wp]
  285. (v5::disconnect_reason_code /*reason_code*/, v5::properties props) {
  286. if (h_disconnect_props_) h_disconnect_props_(force_move(props));
  287. con_sp_t sp = wp.lock();
  288. BOOST_ASSERT(sp);
  289. auto p = sp.get();
  290. try {
  291. disconnect_handler(force_move(sp));
  292. }
  293. catch (std::exception const& ex) {
  294. MQTT_LOG("mqtt_broker", error)
  295. << MQTT_ADD_VALUE(address, p)
  296. << ex.what();
  297. }
  298. }
  299. );
  300. ep.set_puback_handler(
  301. [this, wp]
  302. (packet_id_t packet_id){
  303. con_sp_t sp = wp.lock();
  304. BOOST_ASSERT(sp);
  305. auto p = sp.get();
  306. try {
  307. return puback_handler(
  308. force_move(sp),
  309. packet_id,
  310. v5::puback_reason_code::success,
  311. v5::properties{}
  312. );
  313. }
  314. catch (std::exception const& ex) {
  315. MQTT_LOG("mqtt_broker", error)
  316. << MQTT_ADD_VALUE(address, p)
  317. << ex.what();
  318. return true;
  319. }
  320. }
  321. );
  322. ep.set_v5_puback_handler(
  323. [this, wp]
  324. (packet_id_t packet_id,
  325. v5::puback_reason_code reason_code,
  326. v5::properties props){
  327. con_sp_t sp = wp.lock();
  328. BOOST_ASSERT(sp);
  329. auto p = sp.get();
  330. try {
  331. return puback_handler(
  332. force_move(sp),
  333. packet_id,
  334. reason_code,
  335. force_move(props)
  336. );
  337. }
  338. catch (std::exception const& ex) {
  339. MQTT_LOG("mqtt_broker", error)
  340. << MQTT_ADD_VALUE(address, p)
  341. << ex.what();
  342. return true;
  343. }
  344. }
  345. );
  346. ep.set_pubrec_handler(
  347. [this, wp]
  348. (packet_id_t packet_id){
  349. con_sp_t sp = wp.lock();
  350. BOOST_ASSERT(sp);
  351. auto p = sp.get();
  352. try {
  353. return pubrec_handler(
  354. force_move(sp),
  355. packet_id,
  356. v5::pubrec_reason_code::success,
  357. v5::properties{}
  358. );
  359. }
  360. catch (std::exception const& ex) {
  361. MQTT_LOG("mqtt_broker", error)
  362. << MQTT_ADD_VALUE(address, p)
  363. << ex.what();
  364. return true;
  365. }
  366. }
  367. );
  368. ep.set_v5_pubrec_handler(
  369. [this, wp]
  370. (packet_id_t packet_id,
  371. v5::pubrec_reason_code reason_code,
  372. v5::properties props){
  373. con_sp_t sp = wp.lock();
  374. BOOST_ASSERT(sp);
  375. auto p = sp.get();
  376. try {
  377. return pubrec_handler(
  378. force_move(sp),
  379. packet_id,
  380. reason_code,
  381. force_move(props)
  382. );
  383. }
  384. catch (std::exception const& ex) {
  385. MQTT_LOG("mqtt_broker", error)
  386. << MQTT_ADD_VALUE(address, p)
  387. << ex.what();
  388. return true;
  389. }
  390. }
  391. );
  392. ep.set_pubrel_handler(
  393. [this, wp]
  394. (packet_id_t packet_id){
  395. con_sp_t sp = wp.lock();
  396. BOOST_ASSERT(sp);
  397. auto p = sp.get();
  398. try {
  399. return pubrel_handler(
  400. force_move(sp),
  401. packet_id,
  402. v5::pubrel_reason_code::success,
  403. v5::properties{}
  404. );
  405. }
  406. catch (std::exception const& ex) {
  407. MQTT_LOG("mqtt_broker", error)
  408. << MQTT_ADD_VALUE(address, p)
  409. << ex.what();
  410. return true;
  411. }
  412. }
  413. );
  414. ep.set_v5_pubrel_handler(
  415. [this, wp]
  416. (packet_id_t packet_id,
  417. v5::pubrel_reason_code reason_code,
  418. v5::properties props){
  419. con_sp_t sp = wp.lock();
  420. BOOST_ASSERT(sp);
  421. auto p = sp.get();
  422. try {
  423. return pubrel_handler(
  424. force_move(sp),
  425. packet_id,
  426. reason_code,
  427. force_move(props)
  428. );
  429. }
  430. catch (std::exception const& ex) {
  431. MQTT_LOG("mqtt_broker", error)
  432. << MQTT_ADD_VALUE(address, p)
  433. << ex.what();
  434. return true;
  435. }
  436. }
  437. );
  438. ep.set_pubcomp_handler(
  439. [this, wp]
  440. (packet_id_t packet_id){
  441. con_sp_t sp = wp.lock();
  442. BOOST_ASSERT(sp);
  443. auto p = sp.get();
  444. try {
  445. return pubcomp_handler(
  446. force_move(sp),
  447. packet_id,
  448. v5::pubcomp_reason_code::success,
  449. v5::properties{}
  450. );
  451. }
  452. catch (std::exception const& ex) {
  453. MQTT_LOG("mqtt_broker", error)
  454. << MQTT_ADD_VALUE(address, p)
  455. << ex.what();
  456. return true;
  457. }
  458. }
  459. );
  460. ep.set_v5_pubcomp_handler(
  461. [this, wp]
  462. (packet_id_t packet_id,
  463. v5::pubcomp_reason_code reason_code,
  464. v5::properties props){
  465. con_sp_t sp = wp.lock();
  466. BOOST_ASSERT(sp);
  467. auto p = sp.get();
  468. try {
  469. return pubcomp_handler(
  470. force_move(sp),
  471. packet_id,
  472. reason_code,
  473. force_move(props)
  474. );
  475. }
  476. catch (std::exception const& ex) {
  477. MQTT_LOG("mqtt_broker", error)
  478. << MQTT_ADD_VALUE(address, p)
  479. << ex.what();
  480. return true;
  481. }
  482. }
  483. );
  484. ep.set_publish_handler(
  485. [this, wp]
  486. (optional<packet_id_t> packet_id,
  487. publish_options pubopts,
  488. buffer topic_name,
  489. buffer contents){
  490. con_sp_t sp = wp.lock();
  491. BOOST_ASSERT(sp);
  492. auto p = sp.get();
  493. try {
  494. return publish_handler(
  495. force_move(sp),
  496. packet_id,
  497. pubopts,
  498. force_move(topic_name),
  499. force_move(contents),
  500. v5::properties{}
  501. );
  502. }
  503. catch (std::exception const& ex) {
  504. MQTT_LOG("mqtt_broker", error)
  505. << MQTT_ADD_VALUE(address, p)
  506. << ex.what();
  507. return true;
  508. }
  509. }
  510. );
  511. ep.set_v5_publish_handler(
  512. [this, wp]
  513. (optional<packet_id_t> packet_id,
  514. publish_options pubopts,
  515. buffer topic_name,
  516. buffer contents,
  517. v5::properties props
  518. ) {
  519. if (h_publish_props_) h_publish_props_(props);
  520. con_sp_t sp = wp.lock();
  521. BOOST_ASSERT(sp);
  522. auto p = sp.get();
  523. try {
  524. return publish_handler(
  525. force_move(sp),
  526. packet_id,
  527. pubopts,
  528. force_move(topic_name),
  529. force_move(contents),
  530. force_move(props)
  531. );
  532. }
  533. catch (std::exception const& ex) {
  534. MQTT_LOG("mqtt_broker", error)
  535. << MQTT_ADD_VALUE(address, p)
  536. << ex.what();
  537. return true;
  538. }
  539. }
  540. );
  541. ep.set_subscribe_handler(
  542. [this, wp]
  543. (packet_id_t packet_id,
  544. std::vector<subscribe_entry> entries) {
  545. con_sp_t sp = wp.lock();
  546. BOOST_ASSERT(sp);
  547. auto p = sp.get();
  548. try {
  549. return subscribe_handler(
  550. force_move(sp),
  551. packet_id,
  552. force_move(entries),
  553. v5::properties{}
  554. );
  555. }
  556. catch (std::exception const& ex) {
  557. MQTT_LOG("mqtt_broker", error)
  558. << MQTT_ADD_VALUE(address, p)
  559. << ex.what();
  560. return true;
  561. }
  562. }
  563. );
  564. ep.set_v5_subscribe_handler(
  565. [this, wp]
  566. (packet_id_t packet_id,
  567. std::vector<subscribe_entry> entries,
  568. v5::properties props
  569. ) {
  570. con_sp_t sp = wp.lock();
  571. BOOST_ASSERT(sp);
  572. auto p = sp.get();
  573. try {
  574. return subscribe_handler(
  575. force_move(sp),
  576. packet_id,
  577. force_move(entries),
  578. force_move(props)
  579. );
  580. }
  581. catch (std::exception const& ex) {
  582. MQTT_LOG("mqtt_broker", error)
  583. << MQTT_ADD_VALUE(address, p)
  584. << ex.what();
  585. return true;
  586. }
  587. }
  588. );
  589. ep.set_unsubscribe_handler(
  590. [this, wp]
  591. (packet_id_t packet_id,
  592. std::vector<unsubscribe_entry> entries) {
  593. con_sp_t sp = wp.lock();
  594. BOOST_ASSERT(sp);
  595. auto p = sp.get();
  596. try {
  597. return unsubscribe_handler(
  598. force_move(sp),
  599. packet_id,
  600. force_move(entries),
  601. v5::properties{}
  602. );
  603. }
  604. catch (std::exception const& ex) {
  605. MQTT_LOG("mqtt_broker", error)
  606. << MQTT_ADD_VALUE(address, p)
  607. << ex.what();
  608. return true;
  609. }
  610. }
  611. );
  612. ep.set_v5_unsubscribe_handler(
  613. [this, wp]
  614. (packet_id_t packet_id,
  615. std::vector<unsubscribe_entry> entries,
  616. v5::properties props
  617. ) {
  618. con_sp_t sp = wp.lock();
  619. BOOST_ASSERT(sp);
  620. auto p = sp.get();
  621. try {
  622. return unsubscribe_handler(
  623. force_move(sp),
  624. packet_id,
  625. force_move(entries),
  626. force_move(props)
  627. );
  628. }
  629. catch (std::exception const& ex) {
  630. MQTT_LOG("mqtt_broker", error)
  631. << MQTT_ADD_VALUE(address, p)
  632. << ex.what();
  633. return true;
  634. }
  635. }
  636. );
  637. ep.set_pingreq_handler(
  638. [this, wp] {
  639. con_sp_t sp = wp.lock();
  640. BOOST_ASSERT(sp);
  641. if (pingresp_) {
  642. auto p = sp.get();
  643. p->async_pingresp(
  644. [sp = force_move(sp)]
  645. (error_code ec) {
  646. if (ec) {
  647. MQTT_LOG("mqtt_broker", info)
  648. << MQTT_ADD_VALUE(address, sp.get())
  649. << ec.message();
  650. }
  651. }
  652. );
  653. }
  654. return true;
  655. }
  656. );
  657. ep.set_v5_auth_handler(
  658. [this]
  659. (v5::auth_reason_code /*reason_code*/,
  660. v5::properties props
  661. ) {
  662. if (h_auth_props_) h_auth_props_(force_move(props));
  663. return true;
  664. }
  665. );
  666. // Pass spep to keep lifetime.
  667. // It makes sure wp.lock() never return nullptr in the handlers below
  668. // including close_handler and error_handler.
  669. ep.start_session(spep);
  670. }
  671. void set_connack_props(v5::properties props) {
  672. connack_props_ = force_move(props);
  673. }
  674. void set_suback_props(v5::properties props) {
  675. suback_props_ = force_move(props);
  676. }
  677. void set_unsuback_props(v5::properties props) {
  678. unsuback_props_ = force_move(props);
  679. }
  680. void set_puback_props(v5::properties props) {
  681. puback_props_ = force_move(props);
  682. }
  683. void set_pubrec_props(v5::properties props) {
  684. pubrec_props_ = force_move(props);
  685. }
  686. void set_pubrel_props(v5::properties props) {
  687. pubrel_props_ = force_move(props);
  688. }
  689. void set_pubcomp_props(v5::properties props) {
  690. pubcomp_props_ = force_move(props);
  691. }
  692. void set_connect_props_handler(std::function<void(v5::properties const&)> h) {
  693. h_connect_props_ = force_move(h);
  694. }
  695. void set_disconnect_props_handler(std::function<void(v5::properties const&)> h) {
  696. h_disconnect_props_ = force_move(h);
  697. }
  698. void set_publish_props_handler(std::function<void(v5::properties const&)> h) {
  699. h_publish_props_ = force_move(h);
  700. }
  701. void set_puback_props_handler(std::function<void(v5::properties const&)> h) {
  702. h_puback_props_ = force_move(h);
  703. }
  704. void set_pubrec_props_handler(std::function<void(v5::properties const&)> h) {
  705. h_pubrec_props_ = force_move(h);
  706. }
  707. void set_pubrel_props_handler(std::function<void(v5::properties const&)> h) {
  708. h_pubrel_props_ = force_move(h);
  709. }
  710. void set_pubcomp_props_handler(std::function<void(v5::properties const&)> h) {
  711. h_pubcomp_props_ = force_move(h);
  712. }
  713. void set_subscribe_props_handler(std::function<void(v5::properties const&)> h) {
  714. h_subscribe_props_ = force_move(h);
  715. }
  716. void set_unsubscribe_props_handler(std::function<void(v5::properties const&)> h) {
  717. h_unsubscribe_props_ = force_move(h);
  718. }
  719. void set_auth_props_handler(std::function<void(v5::properties const&)> h) {
  720. h_auth_props_ = force_move(h);
  721. }
  722. void clear_all_sessions() {
  723. std::lock_guard<mutex> g(mtx_sessions_);
  724. sessions_.clear();
  725. }
  726. void clear_all_retained_topics() {
  727. std::lock_guard<mutex> g(mtx_retains_);
  728. retains_.clear();
  729. }
  730. private:
  731. static void force_disconnect(con_sp_t spep) {
  732. auto p = spep.get();
  733. p->async_force_disconnect(
  734. [spep = force_move(spep)]
  735. (error_code ec) {
  736. if (ec) {
  737. MQTT_LOG("mqtt_broker", info)
  738. << MQTT_ADD_VALUE(address, spep.get())
  739. << ec.message();
  740. }
  741. }
  742. );
  743. };
  744. static void disconnect_and_force_disconnect(con_sp_t spep, v5::disconnect_reason_code rc) {
  745. auto p = spep.get();
  746. p->async_disconnect(
  747. rc,
  748. v5::properties{},
  749. [spep = force_move(spep)]
  750. (error_code) mutable {
  751. force_disconnect(force_move(spep));
  752. }
  753. );
  754. };
  755. /**
  756. * @brief connect_proc Process an incoming CONNECT packet
  757. *
  758. * This is called by the connect_handler function, which is registered
  759. * on mqtt connections where the raw transport (tcp / tls / websocket / etc)
  760. * is established, but the CONNECT message has not been sent / received by
  761. * the mqtt client on the other end of the connection.
  762. *
  763. * When the CONNECT message is received, this function is called after some
  764. * basic pre-connection logic, to setup the record keeping that this broker
  765. * class needs to handle the connection and process subscriptions and publishing.
  766. *
  767. * @param clean_start - if the clean-start flag is set on the CONNECT message.
  768. * @param spep - varient of shared pointers to underlying connection type.
  769. * @param req_client_id - the id that the client wants to use (username will be prepended)
  770. * @param will - the last-will-and-testiment of the connection, if any.
  771. */
  772. bool connect_handler(
  773. con_sp_t spep,
  774. buffer client_id,
  775. optional<buffer> noauth_username,
  776. optional<buffer> password,
  777. optional<will> will,
  778. bool clean_start,
  779. std::uint16_t /*keep_alive*/,
  780. v5::properties props
  781. ) {
  782. auto& ep = *spep;
  783. optional<std::string> username;
  784. if (ep.get_preauthed_user_name()) {
  785. if (security.login_cert(ep.get_preauthed_user_name().value())) {
  786. username = ep.get_preauthed_user_name();
  787. }
  788. }
  789. else if (!noauth_username && !password) {
  790. username = security.login_anonymous();
  791. }
  792. else if (noauth_username && password) {
  793. username = security.login(*noauth_username, *password);
  794. }
  795. // If login fails, try the unauthenticated user
  796. if (!username) username = security.login_unauthenticated();
  797. v5::properties connack_props;
  798. connect_param cp = handle_connect_props(ep, props, will);
  799. if (!username) {
  800. MQTT_LOG("mqtt_broker", trace)
  801. << MQTT_ADD_VALUE(address, this)
  802. << "User failed to login: "
  803. << (noauth_username ? std::string(*noauth_username) : std::string("anonymous user"));
  804. send_connack(
  805. ep,
  806. false, // session present
  807. false, // authenticated
  808. force_move(connack_props),
  809. [spep](error_code) {
  810. disconnect_and_force_disconnect(spep, v5::disconnect_reason_code::not_authorized);
  811. }
  812. );
  813. return true;
  814. }
  815. if (client_id.empty()) {
  816. if (!handle_empty_client_id(spep, client_id, clean_start, connack_props)) {
  817. return false;
  818. }
  819. // A new client id was generated
  820. client_id = buffer(string_view(spep->get_client_id()));
  821. }
  822. MQTT_LOG("mqtt_broker", trace)
  823. << MQTT_ADD_VALUE(address, this)
  824. << "User logged in as: '" << *username << "', client_id: " << client_id;
  825. /**
  826. * http://docs.oasis-open.org/mqtt/mqtt/v5.0/cs02/mqtt-v5.0-cs02.html#_Toc514345311
  827. * 3.1.2.4 Clean Start
  828. * If a CONNECT packet is received with Clean Start is set to 1, the Client and Server MUST
  829. * discard any existing Session and start a new Session [MQTT-3.1.2-4]. Consequently,
  830. * the Session Present flag in CONNACK is always set to 0 if Clean Start is set to 1.
  831. */
  832. // Find any sessions that have the same client_id
  833. std::lock_guard<mutex> g(mtx_sessions_);
  834. auto& idx = sessions_.get<tag_cid>();
  835. auto it = idx.lower_bound(std::make_tuple(*username, client_id));
  836. if (it == idx.end() ||
  837. it->client_id() != client_id ||
  838. it->get_username() != *username
  839. ) {
  840. // new connection
  841. MQTT_LOG("mqtt_broker", trace)
  842. << MQTT_ADD_VALUE(address, this)
  843. << "cid:" << client_id
  844. << " new connection inserted.";
  845. it = idx.emplace_hint(
  846. it,
  847. timer_ioc_,
  848. mtx_subs_map_,
  849. subs_map_,
  850. shared_targets_,
  851. spep,
  852. client_id,
  853. *username,
  854. force_move(will),
  855. // will_sender
  856. [this](auto&&... params) {
  857. do_publish(std::forward<decltype(params)>(params)...);
  858. },
  859. force_move(cp.will_expiry_interval),
  860. force_move(cp.session_expiry_interval)
  861. );
  862. if (cp.response_topic_requested) {
  863. // set_response_topic never modify key part
  864. set_response_topic(const_cast<session_state&>(*it), connack_props, *username);
  865. }
  866. send_connack(
  867. ep,
  868. false, // session present
  869. true, // authenticated
  870. force_move(connack_props)
  871. );
  872. }
  873. else if (it->online()) {
  874. // online overwrite
  875. if (close_proc_no_lock(it->con(), true, v5::disconnect_reason_code::session_taken_over)) {
  876. // remain offline
  877. if (clean_start) {
  878. // discard offline session
  879. MQTT_LOG("mqtt_broker", trace)
  880. << MQTT_ADD_VALUE(address, this)
  881. << "cid:" << client_id
  882. << "online connection exists, discard old one due to new one's clean_start and renew";
  883. if (cp.response_topic_requested) {
  884. // set_response_topic never modify key part
  885. set_response_topic(const_cast<session_state&>(*it), connack_props, *username);
  886. }
  887. send_connack(
  888. ep,
  889. false, // session present
  890. true, // authenticated
  891. force_move(connack_props)
  892. );
  893. idx.modify(
  894. it,
  895. [&](auto& e) {
  896. e.clean();
  897. e.update_will(timer_ioc_, force_move(will), cp.will_expiry_interval);
  898. e.set_username(*username);
  899. // renew_session_expiry updates index
  900. e.renew_session_expiry(force_move(cp.session_expiry_interval));
  901. },
  902. [](auto&) { BOOST_ASSERT(false); }
  903. );
  904. }
  905. else {
  906. // inherit online session if previous session's session exists
  907. MQTT_LOG("mqtt_broker", trace)
  908. << MQTT_ADD_VALUE(address, this)
  909. << "cid:" << client_id
  910. << "online connection exists, inherit old one and renew";
  911. if (cp.response_topic_requested) {
  912. // set_response_topic never modify key part
  913. set_response_topic(const_cast<session_state&>(*it), connack_props, *username);
  914. }
  915. send_connack(
  916. ep,
  917. true, // session present
  918. true, // authenticated
  919. force_move(connack_props),
  920. [
  921. this,
  922. &idx,
  923. it,
  924. will = force_move(will),
  925. clean_start,
  926. spep,
  927. will_expiry_interval = cp.will_expiry_interval,
  928. session_expiry_interval = cp.session_expiry_interval,
  929. username
  930. ]
  931. (error_code ec) mutable {
  932. if (ec) {
  933. MQTT_LOG("mqtt_broker", trace)
  934. << MQTT_ADD_VALUE(address, this)
  935. << ec.message();
  936. return;
  937. }
  938. idx.modify(
  939. it,
  940. [&](auto& e) {
  941. e.renew(spep, clean_start);
  942. e.set_username(*username);
  943. e.update_will(timer_ioc_, force_move(will), will_expiry_interval);
  944. // renew_session_expiry updates index
  945. e.renew_session_expiry(force_move(session_expiry_interval));
  946. e.send_inflight_messages();
  947. e.send_all_offline_messages();
  948. },
  949. [](auto&) { BOOST_ASSERT(false); }
  950. );
  951. }
  952. );
  953. }
  954. }
  955. else {
  956. // new connection
  957. MQTT_LOG("mqtt_broker", trace)
  958. << MQTT_ADD_VALUE(address, this)
  959. << "cid:" << client_id
  960. << "online connection exists, discard old one due to session_expiry and renew";
  961. bool inserted;
  962. std::tie(it, inserted) = idx.emplace(
  963. timer_ioc_,
  964. mtx_subs_map_,
  965. subs_map_,
  966. shared_targets_,
  967. spep,
  968. client_id,
  969. *username,
  970. force_move(will),
  971. // will_sender
  972. [this](auto&&... params) {
  973. do_publish(std::forward<decltype(params)>(params)...);
  974. },
  975. force_move(cp.will_expiry_interval),
  976. force_move(cp.session_expiry_interval)
  977. );
  978. BOOST_ASSERT(inserted);
  979. if (cp.response_topic_requested) {
  980. // set_response_topic never modify key part
  981. set_response_topic(const_cast<session_state&>(*it), connack_props, *username);
  982. }
  983. send_connack(
  984. ep,
  985. false, // session present
  986. true, // authenticated
  987. force_move(connack_props)
  988. );
  989. }
  990. }
  991. else {
  992. // offline -> online
  993. if (clean_start) {
  994. // discard offline session
  995. MQTT_LOG("mqtt_broker", trace)
  996. << MQTT_ADD_VALUE(address, this)
  997. << "cid:" << client_id
  998. << "offline connection exists, discard old one due to new one's clean_start and renew";
  999. if (cp.response_topic_requested) {
  1000. // set_response_topic never modify key part
  1001. set_response_topic(const_cast<session_state&>(*it), connack_props, *username);
  1002. }
  1003. send_connack(
  1004. ep,
  1005. false, // session present
  1006. true, // authenticated
  1007. force_move(connack_props)
  1008. );
  1009. idx.modify(
  1010. it,
  1011. [&](auto& e) {
  1012. e.clean();
  1013. e.renew(spep, clean_start);
  1014. e.update_will(timer_ioc_, force_move(will), cp.will_expiry_interval);
  1015. e.set_username(*username);
  1016. // renew_session_expiry updates index
  1017. e.renew_session_expiry(force_move(cp.session_expiry_interval));
  1018. },
  1019. [](auto&) { BOOST_ASSERT(false); }
  1020. );
  1021. }
  1022. else {
  1023. // inherit offline session
  1024. MQTT_LOG("mqtt_broker", trace)
  1025. << MQTT_ADD_VALUE(address, this)
  1026. << "cid:" << client_id
  1027. << "offline connection exists, inherit old one and renew";
  1028. if (cp.response_topic_requested) {
  1029. // set_response_topic never modify key part
  1030. set_response_topic(const_cast<session_state&>(*it), connack_props, *username);
  1031. }
  1032. send_connack(
  1033. ep,
  1034. true, // session present
  1035. true, // authenticated
  1036. force_move(connack_props),
  1037. [
  1038. this,
  1039. &idx,
  1040. it,
  1041. will = force_move(will),
  1042. clean_start,
  1043. spep,
  1044. will_expiry_interval = cp.will_expiry_interval,
  1045. session_expiry_interval = cp.session_expiry_interval,
  1046. username
  1047. ]
  1048. (error_code ec) mutable {
  1049. if (ec) {
  1050. MQTT_LOG("mqtt_broker", trace)
  1051. << MQTT_ADD_VALUE(address, this)
  1052. << ec.message();
  1053. return;
  1054. }
  1055. idx.modify(
  1056. it,
  1057. [&](auto& e) {
  1058. e.renew(spep, clean_start);
  1059. e.set_username(*username);
  1060. e.update_will(timer_ioc_, force_move(will), will_expiry_interval);
  1061. // renew_session_expiry updates index
  1062. e.renew_session_expiry(force_move(session_expiry_interval));
  1063. e.send_inflight_messages();
  1064. e.send_all_offline_messages();
  1065. },
  1066. [](auto&) { BOOST_ASSERT(false); }
  1067. );
  1068. }
  1069. );
  1070. }
  1071. }
  1072. return true;
  1073. }
  1074. struct connect_param {
  1075. optional<std::chrono::steady_clock::duration> session_expiry_interval;
  1076. optional<std::chrono::steady_clock::duration> will_expiry_interval;
  1077. bool response_topic_requested = false;
  1078. };
  1079. connect_param handle_connect_props(
  1080. endpoint_t& ep,
  1081. v5::properties const& props,
  1082. optional<will> const& will
  1083. ) {
  1084. connect_param cp;
  1085. if (ep.get_protocol_version() == protocol_version::v5) {
  1086. {
  1087. auto v = get_property<v5::property::session_expiry_interval>(props);
  1088. if (v && v.value().val() != 0) {
  1089. cp.session_expiry_interval.emplace(std::chrono::seconds(v.value().val()));
  1090. }
  1091. }
  1092. {
  1093. auto v = get_property<v5::property::request_response_information>(props);
  1094. if (v && v.value().val() == 1) {
  1095. cp.response_topic_requested = true;
  1096. }
  1097. }
  1098. if (will) {
  1099. auto v = get_property<v5::property::message_expiry_interval>(will.value().props());
  1100. if (v) {
  1101. cp.will_expiry_interval.emplace(std::chrono::seconds(v.value().val()));
  1102. }
  1103. }
  1104. if (h_connect_props_) {
  1105. h_connect_props_(props);
  1106. }
  1107. }
  1108. return cp;
  1109. }
  1110. void send_connack(
  1111. endpoint_t& ep,
  1112. bool session_present,
  1113. bool authenticated,
  1114. v5::properties props,
  1115. std::function<void(error_code)> finish = [](error_code){}
  1116. ) {
  1117. // Reply to the connect message.
  1118. switch (ep.get_protocol_version()) {
  1119. case protocol_version::v3_1_1:
  1120. if (connack_) ep.async_connack(
  1121. session_present,
  1122. authenticated ? connect_return_code::accepted
  1123. : connect_return_code::not_authorized,
  1124. [finish = force_move(finish)]
  1125. (error_code ec) {
  1126. finish(ec);
  1127. }
  1128. );
  1129. break;
  1130. case protocol_version::v5:
  1131. // connack_props_ member varible is for testing
  1132. if (connack_props_.empty()) {
  1133. // props local variable is is for real case
  1134. props.emplace_back(v5::property::topic_alias_maximum{topic_alias_max});
  1135. props.emplace_back(v5::property::receive_maximum{receive_maximum_max});
  1136. if (connack_) ep.async_connack(
  1137. session_present,
  1138. authenticated ? v5::connect_reason_code::success
  1139. : v5::connect_reason_code::not_authorized,
  1140. force_move(props),
  1141. [finish = force_move(finish)]
  1142. (error_code ec) {
  1143. finish(ec);
  1144. }
  1145. );
  1146. }
  1147. else {
  1148. // use connack_props_ for testing
  1149. if (connack_) ep.async_connack(
  1150. session_present,
  1151. authenticated ? v5::connect_reason_code::success
  1152. : v5::connect_reason_code::not_authorized,
  1153. connack_props_,
  1154. [finish = force_move(finish)]
  1155. (error_code ec) {
  1156. finish(ec);
  1157. }
  1158. );
  1159. }
  1160. break;
  1161. default:
  1162. BOOST_ASSERT(false);
  1163. break;
  1164. }
  1165. }
  1166. void remove_rule(std::size_t rule_nr) {
  1167. security.remove_auth(rule_nr);
  1168. }
  1169. void set_response_topic(session_state& s, v5::properties& connack_props, std::string const &username) {
  1170. auto response_topic =
  1171. [&] {
  1172. if (auto rt_opt = s.get_response_topic()) {
  1173. return rt_opt.value();
  1174. }
  1175. auto rt = create_uuid_string();
  1176. s.set_response_topic(rt);
  1177. return rt;
  1178. } ();
  1179. auto rule_nr = security.add_auth(
  1180. response_topic,
  1181. { "@any" }, MQTT_NS::broker::security::authorization::type::allow,
  1182. { username }, MQTT_NS::broker::security::authorization::type::allow
  1183. );
  1184. s.set_clean_handler(
  1185. [this, response_topic, rule_nr]() {
  1186. std::lock_guard<mutex> g(mtx_retains_);
  1187. retains_.erase(response_topic);
  1188. remove_rule(rule_nr);
  1189. }
  1190. );
  1191. connack_props.emplace_back(
  1192. v5::property::response_topic(
  1193. allocate_buffer(response_topic)
  1194. )
  1195. );
  1196. }
  1197. bool handle_empty_client_id(
  1198. con_sp_t spep,
  1199. buffer const& client_id,
  1200. bool clean_start,
  1201. v5::properties& connack_props
  1202. ) {
  1203. auto& ep = *spep;
  1204. switch (ep.get_protocol_version()) {
  1205. case protocol_version::v3_1_1:
  1206. if (client_id.empty()) {
  1207. if (clean_start) {
  1208. ep.set_client_id(create_uuid_string());
  1209. }
  1210. else {
  1211. // https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349242
  1212. // If the Client supplies a zero-byte ClientId,
  1213. // the Client MUST also set CleanSession to 1 [MQTT-3.1.3-7].
  1214. // If it's a not a clean session, but no client id is provided,
  1215. // we would have no way to map this connection's session to a new connection later.
  1216. // So the connection must be rejected.
  1217. if (connack_) {
  1218. ep.async_connack(
  1219. false,
  1220. connect_return_code::identifier_rejected,
  1221. [&ep, spep = force_move(spep)]
  1222. (error_code ec) mutable {
  1223. if (ec) {
  1224. MQTT_LOG("mqtt_broker", info)
  1225. << MQTT_ADD_VALUE(address, spep.get())
  1226. << ec.message();
  1227. }
  1228. ep.async_force_disconnect(
  1229. [spep = force_move(spep)]
  1230. (error_code ec) {
  1231. if (ec) {
  1232. MQTT_LOG("mqtt_broker", info)
  1233. << MQTT_ADD_VALUE(address, spep.get())
  1234. << ec.message();
  1235. }
  1236. }
  1237. );
  1238. }
  1239. );
  1240. }
  1241. return false;
  1242. }
  1243. }
  1244. break;
  1245. case protocol_version::v5:
  1246. if (client_id.empty()) {
  1247. // https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901059
  1248. // A Server MAY allow a Client to supply a ClientID that has a length of zero bytes,
  1249. // however if it does so the Server MUST treat this as a special case and assign a
  1250. // unique ClientID to that Client [MQTT-3.1.3-6]. It MUST then process the
  1251. // CONNECT packet as if the Client had provided that unique ClientID,
  1252. // and MUST return the Assigned Client Identifier in the CONNACK packet [MQTT-3.1.3-7].
  1253. // If the Server rejects the ClientID it MAY respond to the CONNECT packet with a CONNACK
  1254. // using Reason Code 0x85 (Client Identifier not valid) as described in section 4.13
  1255. // Handling errors, and then it MUST close the Network Connection [MQTT-3.1.3-8].
  1256. //
  1257. // mqtt_cpp author's note: On v5.0, no Clean Start restriction is described.
  1258. ep.set_client_id(create_uuid_string());
  1259. connack_props.emplace_back(
  1260. v5::property::assigned_client_identifier(buffer(string_view(ep.get_client_id())))
  1261. );
  1262. }
  1263. break;
  1264. default:
  1265. BOOST_ASSERT(false);
  1266. return false;
  1267. }
  1268. return true;
  1269. }
  1270. void disconnect_handler(
  1271. con_sp_t spep
  1272. ) {
  1273. if (delay_disconnect_) {
  1274. tim_disconnect_.expires_after(delay_disconnect_.value());
  1275. tim_disconnect_.wait();
  1276. }
  1277. close_proc(force_move(spep), false);
  1278. }
  1279. /**
  1280. * @brief close_proc_no_lock - clean up a connection that has been closed.
  1281. *
  1282. * @param ep - The underlying server (of whichever type) that is disconnecting.
  1283. * @param send_will - Whether to publish this connections last will
  1284. * @return true if offline session is remained, otherwise false
  1285. */
  1286. // TODO: Maybe change the name of this function.
  1287. bool close_proc_no_lock(
  1288. con_sp_t spep,
  1289. bool send_will,
  1290. optional<v5::disconnect_reason_code> rc) {
  1291. endpoint_t& ep = *spep;
  1292. auto& idx = sessions_.get<tag_con>();
  1293. auto it = idx.find(spep);
  1294. // act_sess_it == act_sess_idx.end() could happen if broker accepts
  1295. // the session from client but the client closes the session before sending
  1296. // MQTT `CONNECT` message.
  1297. // In this case, do nothing is correct behavior.
  1298. if (it == idx.end()) return false;
  1299. bool session_clear =
  1300. [&] {
  1301. if (ep.get_protocol_version() == protocol_version::v3_1_1) {
  1302. return ep.clean_session();
  1303. }
  1304. else {
  1305. BOOST_ASSERT(ep.get_protocol_version() == protocol_version::v5);
  1306. auto const& sei_opt = it->session_expiry_interval();
  1307. return !sei_opt || sei_opt.value() == std::chrono::steady_clock::duration::zero();
  1308. }
  1309. } ();
  1310. auto do_send_will =
  1311. [&](session_state& ss) {
  1312. if (send_will) {
  1313. ss.send_will();
  1314. }
  1315. else {
  1316. ss.clear_will();
  1317. }
  1318. };
  1319. if (session_clear) {
  1320. // const_cast is appropriate here
  1321. // See https://github.com/boostorg/multi_index/issues/50
  1322. auto& ss = const_cast<session_state&>(*it);
  1323. do_send_will(ss);
  1324. if (rc) {
  1325. MQTT_LOG("mqtt_broker", trace)
  1326. << MQTT_ADD_VALUE(address, spep.get())
  1327. << "disconnect_and_force_disconnect(async) cid:" << ss.client_id();
  1328. disconnect_and_force_disconnect(spep, rc.value());
  1329. }
  1330. else {
  1331. MQTT_LOG("mqtt_broker", trace)
  1332. << MQTT_ADD_VALUE(address, spep.get())
  1333. << "force_disconnect(async) cid:" << ss.client_id();
  1334. force_disconnect(spep);
  1335. }
  1336. idx.erase(it);
  1337. BOOST_ASSERT(sessions_.get<tag_con>().find(spep) == sessions_.get<tag_con>().end());
  1338. return false;
  1339. }
  1340. else {
  1341. idx.modify(
  1342. it,
  1343. [&](session_state& ss) {
  1344. do_send_will(ss);
  1345. if (rc) {
  1346. MQTT_LOG("mqtt_broker", trace)
  1347. << MQTT_ADD_VALUE(address, spep.get())
  1348. << "disconnect_and_force_disconnect(async) cid:" << ss.client_id();
  1349. disconnect_and_force_disconnect(spep, rc.value());
  1350. }
  1351. else {
  1352. MQTT_LOG("mqtt_broker", trace)
  1353. << MQTT_ADD_VALUE(address, spep.get())
  1354. << "force_disconnect(async) cid:" << ss.client_id();
  1355. force_disconnect(spep);
  1356. }
  1357. // become_offline updates index
  1358. ss.become_offline(
  1359. [this]
  1360. (std::shared_ptr<as::steady_timer> const& sp_tim) {
  1361. sessions_.get<tag_tim>().erase(sp_tim);
  1362. }
  1363. );
  1364. },
  1365. [](auto&) { BOOST_ASSERT(false); }
  1366. );
  1367. return true;
  1368. }
  1369. }
  1370. /**
  1371. * @brief close_proc - clean up a connection that has been closed.
  1372. *
  1373. * @param ep - The underlying server (of whichever type) that is disconnecting.
  1374. * @param send_will - Whether to publish this connections last will
  1375. * @param rc - Reason Code for send pack DISCONNECT
  1376. * @return true if offline session is remained, otherwise false
  1377. */
  1378. // TODO: Maybe change the name of this function.
  1379. bool close_proc(
  1380. con_sp_t spep,
  1381. bool send_will,
  1382. optional<v5::disconnect_reason_code> rc = nullopt
  1383. ) {
  1384. std::lock_guard<mutex> g(mtx_sessions_);
  1385. return close_proc_no_lock(force_move(spep), send_will, rc);
  1386. }
  1387. bool publish_handler(
  1388. con_sp_t spep,
  1389. optional<packet_id_t> packet_id,
  1390. publish_options pubopts,
  1391. buffer topic_name,
  1392. buffer contents,
  1393. v5::properties props) {
  1394. auto& ep = *spep;
  1395. std::shared_lock<mutex> g(mtx_sessions_);
  1396. auto& idx = sessions_.get<tag_con>();
  1397. auto it = idx.find(spep);
  1398. // broker uses async_* APIs
  1399. // If broker erase a connection, then async_force_disconnect()
  1400. // and/or async_force_disconnect () is called.
  1401. // During async operation, spep is valid but it has already been
  1402. // erased from sessions_
  1403. if (it == idx.end()) return true;
  1404. auto send_pubres =
  1405. [&] (bool authorized = true) {
  1406. switch (pubopts.get_qos()) {
  1407. case qos::at_least_once:
  1408. ep.async_puback(
  1409. packet_id.value(),
  1410. authorized ? v5::puback_reason_code::success
  1411. : v5::puback_reason_code::not_authorized,
  1412. puback_props_,
  1413. [spep = force_move(spep)]
  1414. (error_code ec) {
  1415. if (ec) {
  1416. MQTT_LOG("mqtt_broker", info)
  1417. << MQTT_ADD_VALUE(address, spep.get())
  1418. << ec.message();
  1419. }
  1420. }
  1421. );
  1422. break;
  1423. case qos::exactly_once: {
  1424. ep.async_pubrec(
  1425. packet_id.value(),
  1426. authorized ? v5::pubrec_reason_code::success
  1427. : v5::pubrec_reason_code::not_authorized,
  1428. pubrec_props_,
  1429. [spep = force_move(spep)]
  1430. (error_code ec) {
  1431. if (ec) {
  1432. MQTT_LOG("mqtt_broker", info)
  1433. << MQTT_ADD_VALUE(address, spep.get())
  1434. << ec.message();
  1435. }
  1436. }
  1437. );
  1438. } break;
  1439. default:
  1440. break;
  1441. }
  1442. };
  1443. // See if this session is authorized to publish this topic
  1444. if (security.auth_pub(topic_name, it->get_username()) != security::authorization::type::allow) {
  1445. // Publish not authorized
  1446. send_pubres(false);
  1447. return true;
  1448. }
  1449. v5::properties forward_props;
  1450. for (auto&& p : props) {
  1451. MQTT_NS::visit(
  1452. make_lambda_visitor(
  1453. [](v5::property::topic_alias&&) {
  1454. // TopicAlias is not forwarded
  1455. // https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901113
  1456. // A receiver MUST NOT carry forward any Topic Alias mappings from
  1457. // one Network Connection to another [MQTT-3.3.2-7].
  1458. },
  1459. [&ep](v5::property::subscription_identifier&& p) {
  1460. MQTT_LOG("mqtt_broker", warning)
  1461. << MQTT_ADD_VALUE(address, &ep)
  1462. << "Subscription Identifier from client not forwarded sid:" << p.val();
  1463. },
  1464. [&forward_props](auto&& p) {
  1465. forward_props.push_back(force_move(p));
  1466. }
  1467. ),
  1468. force_move(p)
  1469. );
  1470. }
  1471. do_publish(
  1472. *it,
  1473. force_move(topic_name),
  1474. force_move(contents),
  1475. pubopts.get_qos() | pubopts.get_retain(), // remove dup flag
  1476. force_move(forward_props)
  1477. );
  1478. send_pubres();
  1479. return true;
  1480. }
  1481. bool puback_handler(
  1482. con_sp_t spep,
  1483. packet_id_t packet_id,
  1484. v5::puback_reason_code /*reason_code*/,
  1485. v5::properties /*props*/) {
  1486. std::shared_lock<mutex> g(mtx_sessions_);
  1487. auto& idx = sessions_.get<tag_con>();
  1488. auto it = idx.find(spep);
  1489. // broker uses async_* APIs
  1490. // If broker erase a connection, then async_force_disconnect()
  1491. // and/or async_force_disconnect () is called.
  1492. // During async operation, spep is valid but it has already been
  1493. // erased from sessions_
  1494. if (it == idx.end()) return true;
  1495. // const_cast is appropriate here
  1496. // See https://github.com/boostorg/multi_index/issues/50
  1497. auto& ss = const_cast<session_state&>(*it);
  1498. ss.erase_inflight_message_by_packet_id(packet_id);
  1499. ss.send_offline_messages_by_packet_id_release();
  1500. return true;
  1501. }
  1502. bool pubrec_handler(
  1503. con_sp_t spep,
  1504. packet_id_t packet_id,
  1505. v5::pubrec_reason_code reason_code,
  1506. v5::properties /*props*/) {
  1507. std::shared_lock<mutex> g(mtx_sessions_);
  1508. auto& idx = sessions_.get<tag_con>();
  1509. auto it = idx.find(spep);
  1510. // broker uses async_* APIs
  1511. // If broker erase a connection, then async_force_disconnect()
  1512. // and/or async_force_disconnect () is called.
  1513. // During async operation, spep is valid but it has already been
  1514. // erased from sessions_
  1515. if (it == idx.end()) return true;
  1516. // const_cast is appropriate here
  1517. // See https://github.com/boostorg/multi_index/issues/50
  1518. auto& ss = const_cast<session_state&>(*it);
  1519. ss.erase_inflight_message_by_packet_id(packet_id);
  1520. if (is_error(reason_code)) return true;
  1521. auto& ep = *spep;
  1522. switch (ep.get_protocol_version()) {
  1523. case protocol_version::v3_1_1:
  1524. ep.async_pubrel(
  1525. packet_id,
  1526. [spep = force_move(spep)]
  1527. (error_code ec) {
  1528. if (ec) {
  1529. MQTT_LOG("mqtt_broker", info)
  1530. << MQTT_ADD_VALUE(address, spep.get())
  1531. << ec.message();
  1532. }
  1533. }
  1534. );
  1535. break;
  1536. case protocol_version::v5:
  1537. ep.async_pubrel(
  1538. packet_id,
  1539. v5::pubrel_reason_code::success,
  1540. pubrel_props_,
  1541. [spep = force_move(spep)]
  1542. (error_code ec) {
  1543. if (ec) {
  1544. MQTT_LOG("mqtt_broker", info)
  1545. << MQTT_ADD_VALUE(address, spep.get())
  1546. << ec.message();
  1547. }
  1548. }
  1549. );
  1550. break;
  1551. default:
  1552. BOOST_ASSERT(false);
  1553. break;
  1554. }
  1555. return true;
  1556. }
  1557. bool pubrel_handler(
  1558. con_sp_t spep,
  1559. packet_id_t packet_id,
  1560. v5::pubrel_reason_code reason_code,
  1561. v5::properties /*props*/) {
  1562. std::shared_lock<mutex> g(mtx_sessions_);
  1563. auto& idx = sessions_.get<tag_con>();
  1564. auto it = idx.find(spep);
  1565. // broker uses async_* APIs
  1566. // If broker erase a connection, then async_force_disconnect()
  1567. // and/or async_force_disconnect () is called.
  1568. // During async operation, spep is valid but it has already been
  1569. // erased from sessions_
  1570. if (it == idx.end()) return true;
  1571. auto& ep = *spep;
  1572. switch (ep.get_protocol_version()) {
  1573. case protocol_version::v3_1_1:
  1574. ep.async_pubcomp(
  1575. packet_id,
  1576. [spep = force_move(spep)]
  1577. (error_code ec) {
  1578. if (ec) {
  1579. MQTT_LOG("mqtt_broker", info)
  1580. << MQTT_ADD_VALUE(address, spep.get())
  1581. << ec.message();
  1582. }
  1583. }
  1584. );
  1585. break;
  1586. case protocol_version::v5:
  1587. ep.async_pubcomp(
  1588. packet_id,
  1589. // pubcomp reason code is the same as pubrel one
  1590. static_cast<v5::pubcomp_reason_code>(reason_code),
  1591. pubcomp_props_,
  1592. [spep = force_move(spep)]
  1593. (error_code ec) {
  1594. if (ec) {
  1595. MQTT_LOG("mqtt_broker", info)
  1596. << MQTT_ADD_VALUE(address, spep.get())
  1597. << ec.message();
  1598. }
  1599. }
  1600. );
  1601. break;
  1602. default:
  1603. BOOST_ASSERT(false);
  1604. break;
  1605. }
  1606. return true;
  1607. }
  1608. bool pubcomp_handler(
  1609. con_sp_t spep,
  1610. packet_id_t packet_id,
  1611. v5::pubcomp_reason_code /*reason_code*/,
  1612. v5::properties /*props*/){
  1613. std::shared_lock<mutex> g(mtx_sessions_);
  1614. auto& idx = sessions_.get<tag_con>();
  1615. auto it = idx.find(spep);
  1616. // broker uses async_* APIs
  1617. // If broker erase a connection, then async_force_disconnect()
  1618. // and/or async_force_disconnect () is called.
  1619. // During async operation, spep is valid but it has already been
  1620. // erased from sessions_
  1621. if (it == idx.end()) return true;
  1622. // const_cast is appropriate here
  1623. // See https://github.com/boostorg/multi_index/issues/50
  1624. auto& ss = const_cast<session_state&>(*it);
  1625. ss.erase_inflight_message_by_packet_id(packet_id);
  1626. ss.send_offline_messages_by_packet_id_release();
  1627. return true;
  1628. }
  1629. bool subscribe_handler(
  1630. con_sp_t spep,
  1631. packet_id_t packet_id,
  1632. std::vector<subscribe_entry> entries,
  1633. v5::properties props) {
  1634. auto& ep = *spep;
  1635. std::shared_lock<mutex> g(mtx_sessions_);
  1636. auto& idx = sessions_.get<tag_con>();
  1637. auto it = idx.find(spep);
  1638. // broker uses async_* APIs
  1639. // If broker erase a connection, then async_force_disconnect()
  1640. // and/or async_force_disconnect () is called.
  1641. // During async operation, spep is valid but it has already been
  1642. // erased from sessions_
  1643. if (it == idx.end()) return true;
  1644. // The element of sessions_ must have longer lifetime
  1645. // than corresponding subscription.
  1646. // Because the subscription store the reference of the element.
  1647. optional<session_state_ref> ssr_opt;
  1648. // const_cast is appropriate here
  1649. // See https://github.com/boostorg/multi_index/issues/50
  1650. auto& ss = const_cast<session_state&>(*it);
  1651. ssr_opt.emplace(ss);
  1652. BOOST_ASSERT(ssr_opt);
  1653. session_state_ref ssr {ssr_opt.value()};
  1654. auto publish_proc =
  1655. [this, &ssr](retain_t const& r, qos qos_value, optional<std::size_t> sid) {
  1656. auto props = r.props;
  1657. if (sid) {
  1658. props.push_back(v5::property::subscription_identifier(*sid));
  1659. }
  1660. if (r.tim_message_expiry) {
  1661. auto d =
  1662. std::chrono::duration_cast<std::chrono::seconds>(
  1663. r.tim_message_expiry->expiry() - std::chrono::steady_clock::now()
  1664. ).count();
  1665. set_property<v5::property::message_expiry_interval>(
  1666. props,
  1667. v5::property::message_expiry_interval(
  1668. static_cast<uint32_t>(d)
  1669. )
  1670. );
  1671. }
  1672. ssr.get().publish(
  1673. timer_ioc_,
  1674. r.topic,
  1675. r.contents,
  1676. std::min(r.qos_value, qos_value) | MQTT_NS::retain::yes,
  1677. props
  1678. );
  1679. };
  1680. std::vector<std::function<void()>> retain_deliver;
  1681. retain_deliver.reserve(entries.size());
  1682. // subscription identifier
  1683. optional<std::size_t> sid;
  1684. // An in-order list of qos settings, used to send the reply.
  1685. // The MQTT protocol 3.1.1 - 3.8.4 Response - paragraph 6
  1686. // allows the server to grant a lower QOS than requested
  1687. // So we reply with the QOS setting that was granted
  1688. // not the one requested.
  1689. switch (ep.get_protocol_version()) {
  1690. case protocol_version::v3_1_1: {
  1691. std::vector<suback_return_code> res;
  1692. res.reserve(entries.size());
  1693. for (auto& e : entries) {
  1694. if (security.is_subscribe_authorized(ss.get_username(), e.topic_filter)) {
  1695. res.emplace_back(qos_to_suback_return_code(e.subopts.get_qos())); // converts to granted_qos_x
  1696. ssr.get().subscribe(
  1697. force_move(e.share_name),
  1698. e.topic_filter,
  1699. e.subopts,
  1700. [&] {
  1701. std::shared_lock<mutex> g(mtx_retains_);
  1702. retains_.find(
  1703. e.topic_filter,
  1704. [&](retain_t const& r) {
  1705. retain_deliver.emplace_back(
  1706. [&publish_proc, &r, qos_value = e.subopts.get_qos(), sid] {
  1707. publish_proc(r, qos_value, sid);
  1708. }
  1709. );
  1710. }
  1711. );
  1712. }
  1713. );
  1714. }
  1715. else {
  1716. // User not authorized to subscribe to topic filter
  1717. res.emplace_back(suback_return_code::failure);
  1718. }
  1719. }
  1720. // Acknowledge the subscriptions, and the registered QOS settings
  1721. ep.async_suback(
  1722. packet_id,
  1723. force_move(res),
  1724. [spep = force_move(spep)]
  1725. (error_code ec) {
  1726. if (ec) {
  1727. MQTT_LOG("mqtt_broker", info)
  1728. << MQTT_ADD_VALUE(address, spep.get())
  1729. << ec.message();
  1730. }
  1731. }
  1732. );
  1733. } break;
  1734. case protocol_version::v5: {
  1735. // Get subscription identifier
  1736. auto v = get_property<v5::property::subscription_identifier>(props);
  1737. if (v && v.value().val() != 0) {
  1738. sid.emplace(v.value().val());
  1739. }
  1740. std::vector<v5::suback_reason_code> res;
  1741. res.reserve(entries.size());
  1742. for (auto& e : entries) {
  1743. if (security.is_subscribe_authorized(ss.get_username(), e.topic_filter)) {
  1744. res.emplace_back(v5::qos_to_suback_reason_code(e.subopts.get_qos())); // converts to granted_qos_x
  1745. ssr.get().subscribe(
  1746. force_move(e.share_name),
  1747. e.topic_filter,
  1748. e.subopts,
  1749. [&] {
  1750. std::shared_lock<mutex> g(mtx_retains_);
  1751. retains_.find(
  1752. e.topic_filter,
  1753. [&](retain_t const& r) {
  1754. retain_deliver.emplace_back(
  1755. [&publish_proc, &r, qos_value = e.subopts.get_qos(), sid] {
  1756. publish_proc(r, qos_value, sid);
  1757. }
  1758. );
  1759. }
  1760. );
  1761. },
  1762. sid
  1763. );
  1764. }
  1765. else {
  1766. // User not authorized to subscribe to topic filter
  1767. res.emplace_back(v5::suback_reason_code::not_authorized);
  1768. }
  1769. }
  1770. if (h_subscribe_props_) h_subscribe_props_(props);
  1771. // Acknowledge the subscriptions, and the registered QOS settings
  1772. ep.async_suback(
  1773. packet_id,
  1774. force_move(res),
  1775. suback_props_,
  1776. [spep = force_move(spep)]
  1777. (error_code ec) {
  1778. if (ec) {
  1779. MQTT_LOG("mqtt_broker", info)
  1780. << MQTT_ADD_VALUE(address, spep.get())
  1781. << ec.message();
  1782. }
  1783. }
  1784. );
  1785. } break;
  1786. default:
  1787. BOOST_ASSERT(false);
  1788. break;
  1789. }
  1790. for (auto const& f : retain_deliver) {
  1791. f();
  1792. }
  1793. return true;
  1794. }
  1795. bool unsubscribe_handler(
  1796. con_sp_t spep,
  1797. packet_id_t packet_id,
  1798. std::vector<unsubscribe_entry> entries,
  1799. v5::properties props) {
  1800. auto& ep = *spep;
  1801. std::shared_lock<mutex> g(mtx_sessions_);
  1802. auto& idx = sessions_.get<tag_con>();
  1803. auto it = idx.find(spep);
  1804. // broker uses async_* APIs
  1805. // If broker erase a connection, then async_force_disconnect()
  1806. // and/or async_force_disconnect () is called.
  1807. // During async operation, spep is valid but it has already been
  1808. // erased from sessions_
  1809. if (it == idx.end()) return true;
  1810. // The element of sessions_ must have longer lifetime
  1811. // than corresponding subscription.
  1812. // Because the subscription store the reference of the element.
  1813. optional<session_state_ref> ssr_opt;
  1814. // const_cast is appropriate here
  1815. // See https://github.com/boostorg/multi_index/issues/50
  1816. auto& ss = const_cast<session_state&>(*it);
  1817. ssr_opt.emplace(ss);
  1818. BOOST_ASSERT(ssr_opt);
  1819. session_state_ref ssr {ssr_opt.value()};
  1820. // For each subscription that this connection has
  1821. // Compare against the list of topic filters, and remove
  1822. // the subscription if the topic filter is in the list.
  1823. for (auto const& e : entries) {
  1824. ssr.get().unsubscribe(e.share_name, e.topic_filter);
  1825. }
  1826. switch (ep.get_protocol_version()) {
  1827. case protocol_version::v3_1_1:
  1828. ep.async_unsuback(
  1829. packet_id,
  1830. [spep = force_move(spep)]
  1831. (error_code ec) {
  1832. if (ec) {
  1833. MQTT_LOG("mqtt_broker", info)
  1834. << MQTT_ADD_VALUE(address, spep.get())
  1835. << ec.message();
  1836. }
  1837. }
  1838. );
  1839. break;
  1840. case protocol_version::v5:
  1841. if (h_unsubscribe_props_) h_unsubscribe_props_(props);
  1842. ep.async_unsuback(
  1843. packet_id,
  1844. std::vector<v5::unsuback_reason_code>(
  1845. entries.size(),
  1846. v5::unsuback_reason_code::success
  1847. ),
  1848. unsuback_props_,
  1849. [spep = force_move(spep)]
  1850. (error_code ec) {
  1851. if (ec) {
  1852. MQTT_LOG("mqtt_broker", info)
  1853. << MQTT_ADD_VALUE(address, spep.get())
  1854. << ec.message();
  1855. }
  1856. }
  1857. );
  1858. break;
  1859. default:
  1860. BOOST_ASSERT(false);
  1861. break;
  1862. }
  1863. return true;
  1864. }
  1865. /**
  1866. * @brief do_publish Publish a message to any subscribed clients.
  1867. *
  1868. * @param source_ss - soource session_state.
  1869. * @param topic - The topic to publish the message on.
  1870. * @param contents - The contents of the message.
  1871. * @param pubopts - publish options
  1872. * @param props - properties
  1873. */
  1874. void do_publish(
  1875. session_state const& source_ss,
  1876. buffer topic,
  1877. buffer contents,
  1878. publish_options pubopts,
  1879. v5::properties props
  1880. ) {
  1881. // Get auth rights for this topic
  1882. // auth_users prepared once here, and then referred multiple times in subs_map_.modify() for efficiency
  1883. auto auth_users = security.auth_sub(topic);
  1884. // publish the message to subscribers.
  1885. // retain is delivered as the original only if rap_value is rap::retain.
  1886. // On MQTT v3.1.1, rap_value is always rap::dont.
  1887. auto deliver =
  1888. [&] (session_state& ss, subscription& sub, auto const& auth_users) {
  1889. // See if this session is authorized to subscribe this topic
  1890. auto access = security.auth_sub_user(auth_users, ss.get_username());
  1891. if (access != security::authorization::type::allow) return;
  1892. publish_options new_pubopts = std::min(pubopts.get_qos(), sub.subopts.get_qos());
  1893. if (sub.subopts.get_rap() == rap::retain && pubopts.get_retain() == MQTT_NS::retain::yes) {
  1894. new_pubopts |= MQTT_NS::retain::yes;
  1895. }
  1896. if (sub.sid) {
  1897. props.push_back(v5::property::subscription_identifier(sub.sid.value()));
  1898. ss.deliver(
  1899. timer_ioc_,
  1900. topic,
  1901. contents,
  1902. new_pubopts,
  1903. props
  1904. );
  1905. props.pop_back();
  1906. }
  1907. else {
  1908. ss.deliver(
  1909. timer_ioc_,
  1910. topic,
  1911. contents,
  1912. new_pubopts,
  1913. props
  1914. );
  1915. }
  1916. };
  1917. // share_name topic_filter
  1918. std::set<std::tuple<string_view, string_view>> sent;
  1919. {
  1920. std::shared_lock<mutex> g{mtx_subs_map_};
  1921. subs_map_.modify(
  1922. topic,
  1923. [&](buffer const& /*key*/, subscription& sub) {
  1924. if (sub.share_name.empty()) {
  1925. // Non shared subscriptions
  1926. // If NL (no local) subscription option is set and
  1927. // publisher is the same as subscriber, then skip it.
  1928. if (sub.subopts.get_nl() == nl::yes &&
  1929. sub.ss.get().client_id() == source_ss.client_id()) return;
  1930. deliver(sub.ss.get(), sub, auth_users);
  1931. }
  1932. else {
  1933. // Shared subscriptions
  1934. bool inserted;
  1935. std::tie(std::ignore, inserted) = sent.emplace(sub.share_name, sub.topic_filter);
  1936. if (inserted) {
  1937. if (auto ssr_opt = shared_targets_.get_target(sub.share_name, sub.topic_filter)) {
  1938. deliver(ssr_opt.value().get(), sub, auth_users);
  1939. }
  1940. }
  1941. }
  1942. }
  1943. );
  1944. }
  1945. optional<std::chrono::steady_clock::duration> message_expiry_interval;
  1946. if (source_ss.get_protocol_version() == protocol_version::v5) {
  1947. auto v = get_property<v5::property::message_expiry_interval>(props);
  1948. if (v) {
  1949. message_expiry_interval.emplace(std::chrono::seconds(v.value().val()));
  1950. }
  1951. }
  1952. /*
  1953. * If the message is marked as being retained, then we
  1954. * keep it in case a new subscription is added that matches
  1955. * this topic.
  1956. *
  1957. * @note: The MQTT standard 3.3.1.3 RETAIN makes it clear that
  1958. * retained messages are global based on the topic, and
  1959. * are not scoped by the client id. So any client may
  1960. * publish a retained message on any topic, and the most
  1961. * recently published retained message on a particular
  1962. * topic is the message that is stored on the server.
  1963. *
  1964. * @note: The standard doesn't make it clear that publishing
  1965. * a message with zero length, but the retain flag not
  1966. * set, does not result in any existing retained message
  1967. * being removed. However, internet searching indicates
  1968. * that most brokers have opted to keep retained messages
  1969. * when receiving contents of zero bytes, unless the so
  1970. * received message has the retain flag set, in which case
  1971. * the retained message is removed.
  1972. */
  1973. if (pubopts.get_retain() == MQTT_NS::retain::yes) {
  1974. if (contents.empty()) {
  1975. std::lock_guard<mutex> g(mtx_retains_);
  1976. retains_.erase(topic);
  1977. }
  1978. else {
  1979. std::shared_ptr<as::steady_timer> tim_message_expiry;
  1980. if (message_expiry_interval) {
  1981. tim_message_expiry = std::make_shared<as::steady_timer>(timer_ioc_, message_expiry_interval.value());
  1982. tim_message_expiry->async_wait(
  1983. [this, topic = topic, wp = std::weak_ptr<as::steady_timer>(tim_message_expiry)]
  1984. (boost::system::error_code const& ec) {
  1985. if (auto sp = wp.lock()) {
  1986. if (!ec) {
  1987. retains_.erase(topic);
  1988. }
  1989. }
  1990. }
  1991. );
  1992. }
  1993. std::lock_guard<mutex> g(mtx_retains_);
  1994. retains_.insert_or_assign(
  1995. topic,
  1996. retain_t {
  1997. force_move(topic),
  1998. force_move(contents),
  1999. force_move(props),
  2000. pubopts.get_qos(),
  2001. tim_message_expiry
  2002. }
  2003. );
  2004. }
  2005. }
  2006. }
  2007. private:
  2008. as::io_context& timer_ioc_; ///< The boost asio context to run this broker on.
  2009. as::steady_timer tim_disconnect_; ///< Used to delay disconnect handling for testing
  2010. optional<std::chrono::steady_clock::duration> delay_disconnect_; ///< Used to delay disconnect handling for testing
  2011. // Authorization and authentication settings
  2012. broker::security security;
  2013. mutable mutex mtx_subs_map_;
  2014. sub_con_map subs_map_; /// subscription information
  2015. shared_target shared_targets_; /// shared subscription targets
  2016. ///< Map of active client id and connections
  2017. /// session_state has references of subs_map_ and shared_targets_.
  2018. /// because session_state (member of sessions_) has references of subs_map_ and shared_targets_.
  2019. mutable mutex mtx_sessions_;
  2020. session_states sessions_;
  2021. mutable mutex mtx_retains_;
  2022. retained_messages retains_; ///< A list of messages retained so they can be sent to newly subscribed clients.
  2023. // MQTTv5 members
  2024. v5::properties connack_props_;
  2025. v5::properties suback_props_;
  2026. v5::properties unsuback_props_;
  2027. v5::properties puback_props_;
  2028. v5::properties pubrec_props_;
  2029. v5::properties pubrel_props_;
  2030. v5::properties pubcomp_props_;
  2031. std::function<void(v5::properties const&)> h_connect_props_;
  2032. std::function<void(v5::properties const&)> h_disconnect_props_;
  2033. std::function<void(v5::properties const&)> h_publish_props_;
  2034. std::function<void(v5::properties const&)> h_puback_props_;
  2035. std::function<void(v5::properties const&)> h_pubrec_props_;
  2036. std::function<void(v5::properties const&)> h_pubrel_props_;
  2037. std::function<void(v5::properties const&)> h_pubcomp_props_;
  2038. std::function<void(v5::properties const&)> h_subscribe_props_;
  2039. std::function<void(v5::properties const&)> h_unsubscribe_props_;
  2040. std::function<void(v5::properties const&)> h_auth_props_;
  2041. bool pingresp_ = true;
  2042. bool connack_ = true;
  2043. };
  2044. MQTT_BROKER_NS_END
  2045. #endif // MQTT_BROKER_BROKER_HPP