1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194 |
- // Copyright Takatoshi Kondo 2017
- //
- // Distributed under the Boost Software License, Version 1.0.
- // (See accompanying file LICENSE_1_0.txt or copy at
- // http://www.boost.org/LICENSE_1_0.txt)
- #if !defined(MQTT_BROKER_BROKER_HPP)
- #define MQTT_BROKER_BROKER_HPP
- #include <mqtt/config.hpp>
- #include <set>
- #include <boost/lexical_cast.hpp>
- #include <mqtt/broker/broker_namespace.hpp>
- #include <mqtt/optional.hpp>
- #include <mqtt/property.hpp>
- #include <mqtt/visitor_util.hpp>
- #include <mqtt/broker/session_state.hpp>
- #include <mqtt/broker/sub_con_map.hpp>
- #include <mqtt/broker/retained_messages.hpp>
- #include <mqtt/broker/retained_topic_map.hpp>
- #include <mqtt/broker/shared_target_impl.hpp>
- #include <mqtt/broker/mutex.hpp>
- #include <mqtt/broker/uuid.hpp>
- #include <mqtt/broker/constant.hpp>
- #include <mqtt/broker/security.hpp>
- MQTT_BROKER_NS_BEGIN
- namespace mi = boost::multi_index;
- namespace as = boost::asio;
- class broker_t {
- public:
- broker_t(as::io_context& timer_ioc)
- :timer_ioc_(timer_ioc),
- tim_disconnect_(timer_ioc_) {
- security.default_config();
- }
- // [begin] for test setting
- /**
- * @brief set_disconnect_delay adds a delay to disconnect operations.
- *
- * This makes the broker wait the specified amount between when a disconnect
- * is received from a client, and when the connection is actually closed in
- * the broker.
- *
- * @param delay - the amount to delay by
- */
- void set_disconnect_delay(std::chrono::steady_clock::duration delay) {
- delay_disconnect_ = force_move(delay);
- }
- /**
- * @brief set pingresp send operaton
- *
- * @param b - if true, send pingresp when pingreq is received.
- * if false, doesn't send pingresp for test.
- */
- void set_pingresp(bool b) {
- pingresp_ = b;
- }
- /**
- * @brief set pingresp send operaton
- *
- * @param b - if true, send connack when connect is received.
- * if false, doesn't send connack for test.
- */
- void set_connack(bool b) {
- connack_ = b;
- }
- /**
- * @brief configure the security settings
- */
- void set_security(broker::security&& security) {
- this->security = force_move(security);
- }
- // [end] for test setting
- /**
- * @brief handle_accept
- *
- * Call this function when an server (of whatever kind) has accepted a raw
- * connection from an MQTT client. By 'raw connection', this might be raw TCP sockets
- * or websockets, or completed a TLS handshake, or any other underlying transport
- * type, but what is not meant is that the mqtt client on the other end of the endpoint
- * has initiated the MQTT application protocol connection sequence with CONNECT or CONACK
- * messages being sent or received.
- *
- * This function will assign several event handlers into server (of whatever kind)
- * that is provided as a parameter. This includes connection handlers, disconnection handlers
- * and various handlers for a variety of of MQTT message types.
- *
- * @param ep - The server (of whichever kind) to accept a connection on.
- */
- void handle_accept(con_sp_t spep) {
- con_wp_t wp(spep);
- endpoint_t& ep = *spep;
- ep.socket().lowest_layer().set_option(as::ip::tcp::no_delay(true));
- ep.set_auto_pub_response(false);
- ep.set_async_operation(true);
- ep.set_topic_alias_maximum(MQTT_NS::topic_alias_max);
- // set connection (lower than MQTT) level handlers
- ep.set_close_handler(
- [this, wp]
- (){
- con_sp_t sp = wp.lock();
- BOOST_ASSERT(sp);
- close_proc(force_move(sp), true);
- });
- ep.set_error_handler(
- [this, wp]
- (error_code ec){
- con_sp_t sp = wp.lock();
- BOOST_ASSERT(sp);
- auto ver = sp->get_protocol_version();
- MQTT_LOG("mqtt_broker", info)
- << MQTT_ADD_VALUE(address, this)
- << " error_handler is called. ec:" << ec.message() << " protocol_version:" << ver;
- auto send_response =
- [&](auto ec) {
- if (sp->connected()) {
- auto rc =
- [&] () -> MQTT_NS::optional<v5::disconnect_reason_code> {
- if (ec == boost::system::errc::protocol_error) {
- return MQTT_NS::v5::disconnect_reason_code::protocol_error;
- }
- else if (ec == boost::system::errc::bad_message) {
- return MQTT_NS::v5::disconnect_reason_code::malformed_packet;
- }
- return MQTT_NS::nullopt;
- }();
- if (rc) {
- MQTT_LOG("mqtt_broker", trace)
- << MQTT_ADD_VALUE(address, this)
- << "send DISCONNECT reason_code:" << rc.value();
- sp->async_disconnect(
- rc.value(),
- v5::properties{},
- [sp]
- (error_code ec) {
- if (ec) {
- MQTT_LOG("mqtt_broker", info)
- << MQTT_ADD_VALUE(address, sp.get())
- << ec.message();
- }
- }
- );
- }
- }
- else if (sp->underlying_connected()){
- // underlying layer connected, mqtt connecting
- // and protocol_version has already been determind as v5
- auto rc =
- [&] () -> MQTT_NS::optional<v5::connect_reason_code> {
- if (ec ==boost::system::errc::protocol_error) {
- return MQTT_NS::v5::connect_reason_code::protocol_error;
- }
- else if (ec == boost::system::errc::bad_message) {
- return MQTT_NS::v5::connect_reason_code::malformed_packet;
- }
- return MQTT_NS::nullopt;
- }();
- if (rc) {
- MQTT_LOG("mqtt_broker", trace)
- << MQTT_ADD_VALUE(address, this)
- << "send CONNACK reason_code:" << rc.value();
- if (connack_) sp->async_connack(
- false,
- rc.value(),
- [sp]
- (error_code ec) {
- if (ec) {
- MQTT_LOG("mqtt_broker", info)
- << MQTT_ADD_VALUE(address, sp.get())
- << ec.message();
- }
- }
- );
- }
- }
- };
- switch (ver) {
- case MQTT_NS::protocol_version::v5:
- // https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#S4_13_Errors
- // https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901205
- //
- // The DISCONNECT packet is the final MQTT Control Packet sent from the Client or
- // the Server.
- send_response(ec);
- break;
- case MQTT_NS::protocol_version::v3_1_1:
- // DISCONNECT can't be sent by broker on v3.1.1
- //
- // http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718090
- //
- // The DISCONNECT Packet is the final Control Packet sent from the Client to the Server.
- // It indicates that the Client is disconnecting cleanly.
- //
- // At the MQTT connecting, there is no appropriate Connect Return Code on v3.1.1
- // http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718035
- break;
- default:
- // The protocol_version is in the CONNECT packet.
- // Protocol error could happen before the protocol_version is parsed.
- break;
- }
- close_proc(force_move(sp), true);
- }
- );
- // set MQTT level handlers
- ep.set_connect_handler(
- [this, wp]
- (buffer client_id,
- optional<buffer> username,
- optional<buffer> password,
- optional<will> will,
- bool clean_session,
- std::uint16_t keep_alive) {
- con_sp_t sp = wp.lock();
- BOOST_ASSERT(sp);
- auto p = sp.get();
- try {
- return connect_handler(
- force_move(sp),
- force_move(client_id),
- force_move(username),
- force_move(password),
- force_move(will),
- clean_session,
- keep_alive,
- v5::properties{}
- );
- }
- catch (std::exception const& ex) {
- MQTT_LOG("mqtt_broker", error)
- << MQTT_ADD_VALUE(address, p)
- << ex.what();
- return true;
- }
- }
- );
- ep.set_v5_connect_handler(
- [this, wp]
- (buffer client_id,
- optional<buffer> username,
- optional<buffer> password,
- optional<will> will,
- bool clean_start,
- std::uint16_t keep_alive,
- v5::properties props) {
- con_sp_t sp = wp.lock();
- BOOST_ASSERT(sp);
- auto p = sp.get();
- try {
- return connect_handler(
- force_move(sp),
- force_move(client_id),
- force_move(username),
- force_move(password),
- force_move(will),
- clean_start,
- keep_alive,
- force_move(props)
- );
- }
- catch (std::exception const& ex) {
- MQTT_LOG("mqtt_broker", error)
- << MQTT_ADD_VALUE(address, p)
- << ex.what();
- return true;
- }
- }
- );
- ep.set_disconnect_handler(
- [this, wp]
- (){
- con_sp_t sp = wp.lock();
- BOOST_ASSERT(sp);
- auto p = sp.get();
- try {
- disconnect_handler(force_move(sp));
- }
- catch (std::exception const& ex) {
- MQTT_LOG("mqtt_broker", error)
- << MQTT_ADD_VALUE(address, p)
- << ex.what();
- }
- }
- );
- ep.set_v5_disconnect_handler(
- [this, wp]
- (v5::disconnect_reason_code /*reason_code*/, v5::properties props) {
- if (h_disconnect_props_) h_disconnect_props_(force_move(props));
- con_sp_t sp = wp.lock();
- BOOST_ASSERT(sp);
- auto p = sp.get();
- try {
- disconnect_handler(force_move(sp));
- }
- catch (std::exception const& ex) {
- MQTT_LOG("mqtt_broker", error)
- << MQTT_ADD_VALUE(address, p)
- << ex.what();
- }
- }
- );
- ep.set_puback_handler(
- [this, wp]
- (packet_id_t packet_id){
- con_sp_t sp = wp.lock();
- BOOST_ASSERT(sp);
- auto p = sp.get();
- try {
- return puback_handler(
- force_move(sp),
- packet_id,
- v5::puback_reason_code::success,
- v5::properties{}
- );
- }
- catch (std::exception const& ex) {
- MQTT_LOG("mqtt_broker", error)
- << MQTT_ADD_VALUE(address, p)
- << ex.what();
- return true;
- }
- }
- );
- ep.set_v5_puback_handler(
- [this, wp]
- (packet_id_t packet_id,
- v5::puback_reason_code reason_code,
- v5::properties props){
- con_sp_t sp = wp.lock();
- BOOST_ASSERT(sp);
- auto p = sp.get();
- try {
- return puback_handler(
- force_move(sp),
- packet_id,
- reason_code,
- force_move(props)
- );
- }
- catch (std::exception const& ex) {
- MQTT_LOG("mqtt_broker", error)
- << MQTT_ADD_VALUE(address, p)
- << ex.what();
- return true;
- }
- }
- );
- ep.set_pubrec_handler(
- [this, wp]
- (packet_id_t packet_id){
- con_sp_t sp = wp.lock();
- BOOST_ASSERT(sp);
- auto p = sp.get();
- try {
- return pubrec_handler(
- force_move(sp),
- packet_id,
- v5::pubrec_reason_code::success,
- v5::properties{}
- );
- }
- catch (std::exception const& ex) {
- MQTT_LOG("mqtt_broker", error)
- << MQTT_ADD_VALUE(address, p)
- << ex.what();
- return true;
- }
- }
- );
- ep.set_v5_pubrec_handler(
- [this, wp]
- (packet_id_t packet_id,
- v5::pubrec_reason_code reason_code,
- v5::properties props){
- con_sp_t sp = wp.lock();
- BOOST_ASSERT(sp);
- auto p = sp.get();
- try {
- return pubrec_handler(
- force_move(sp),
- packet_id,
- reason_code,
- force_move(props)
- );
- }
- catch (std::exception const& ex) {
- MQTT_LOG("mqtt_broker", error)
- << MQTT_ADD_VALUE(address, p)
- << ex.what();
- return true;
- }
- }
- );
- ep.set_pubrel_handler(
- [this, wp]
- (packet_id_t packet_id){
- con_sp_t sp = wp.lock();
- BOOST_ASSERT(sp);
- auto p = sp.get();
- try {
- return pubrel_handler(
- force_move(sp),
- packet_id,
- v5::pubrel_reason_code::success,
- v5::properties{}
- );
- }
- catch (std::exception const& ex) {
- MQTT_LOG("mqtt_broker", error)
- << MQTT_ADD_VALUE(address, p)
- << ex.what();
- return true;
- }
- }
- );
- ep.set_v5_pubrel_handler(
- [this, wp]
- (packet_id_t packet_id,
- v5::pubrel_reason_code reason_code,
- v5::properties props){
- con_sp_t sp = wp.lock();
- BOOST_ASSERT(sp);
- auto p = sp.get();
- try {
- return pubrel_handler(
- force_move(sp),
- packet_id,
- reason_code,
- force_move(props)
- );
- }
- catch (std::exception const& ex) {
- MQTT_LOG("mqtt_broker", error)
- << MQTT_ADD_VALUE(address, p)
- << ex.what();
- return true;
- }
- }
- );
- ep.set_pubcomp_handler(
- [this, wp]
- (packet_id_t packet_id){
- con_sp_t sp = wp.lock();
- BOOST_ASSERT(sp);
- auto p = sp.get();
- try {
- return pubcomp_handler(
- force_move(sp),
- packet_id,
- v5::pubcomp_reason_code::success,
- v5::properties{}
- );
- }
- catch (std::exception const& ex) {
- MQTT_LOG("mqtt_broker", error)
- << MQTT_ADD_VALUE(address, p)
- << ex.what();
- return true;
- }
- }
- );
- ep.set_v5_pubcomp_handler(
- [this, wp]
- (packet_id_t packet_id,
- v5::pubcomp_reason_code reason_code,
- v5::properties props){
- con_sp_t sp = wp.lock();
- BOOST_ASSERT(sp);
- auto p = sp.get();
- try {
- return pubcomp_handler(
- force_move(sp),
- packet_id,
- reason_code,
- force_move(props)
- );
- }
- catch (std::exception const& ex) {
- MQTT_LOG("mqtt_broker", error)
- << MQTT_ADD_VALUE(address, p)
- << ex.what();
- return true;
- }
- }
- );
- ep.set_publish_handler(
- [this, wp]
- (optional<packet_id_t> packet_id,
- publish_options pubopts,
- buffer topic_name,
- buffer contents){
- con_sp_t sp = wp.lock();
- BOOST_ASSERT(sp);
- auto p = sp.get();
- try {
- return publish_handler(
- force_move(sp),
- packet_id,
- pubopts,
- force_move(topic_name),
- force_move(contents),
- v5::properties{}
- );
- }
- catch (std::exception const& ex) {
- MQTT_LOG("mqtt_broker", error)
- << MQTT_ADD_VALUE(address, p)
- << ex.what();
- return true;
- }
- }
- );
- ep.set_v5_publish_handler(
- [this, wp]
- (optional<packet_id_t> packet_id,
- publish_options pubopts,
- buffer topic_name,
- buffer contents,
- v5::properties props
- ) {
- if (h_publish_props_) h_publish_props_(props);
- con_sp_t sp = wp.lock();
- BOOST_ASSERT(sp);
- auto p = sp.get();
- try {
- return publish_handler(
- force_move(sp),
- packet_id,
- pubopts,
- force_move(topic_name),
- force_move(contents),
- force_move(props)
- );
- }
- catch (std::exception const& ex) {
- MQTT_LOG("mqtt_broker", error)
- << MQTT_ADD_VALUE(address, p)
- << ex.what();
- return true;
- }
- }
- );
- ep.set_subscribe_handler(
- [this, wp]
- (packet_id_t packet_id,
- std::vector<subscribe_entry> entries) {
- con_sp_t sp = wp.lock();
- BOOST_ASSERT(sp);
- auto p = sp.get();
- try {
- return subscribe_handler(
- force_move(sp),
- packet_id,
- force_move(entries),
- v5::properties{}
- );
- }
- catch (std::exception const& ex) {
- MQTT_LOG("mqtt_broker", error)
- << MQTT_ADD_VALUE(address, p)
- << ex.what();
- return true;
- }
- }
- );
- ep.set_v5_subscribe_handler(
- [this, wp]
- (packet_id_t packet_id,
- std::vector<subscribe_entry> entries,
- v5::properties props
- ) {
- con_sp_t sp = wp.lock();
- BOOST_ASSERT(sp);
- auto p = sp.get();
- try {
- return subscribe_handler(
- force_move(sp),
- packet_id,
- force_move(entries),
- force_move(props)
- );
- }
- catch (std::exception const& ex) {
- MQTT_LOG("mqtt_broker", error)
- << MQTT_ADD_VALUE(address, p)
- << ex.what();
- return true;
- }
- }
- );
- ep.set_unsubscribe_handler(
- [this, wp]
- (packet_id_t packet_id,
- std::vector<unsubscribe_entry> entries) {
- con_sp_t sp = wp.lock();
- BOOST_ASSERT(sp);
- auto p = sp.get();
- try {
- return unsubscribe_handler(
- force_move(sp),
- packet_id,
- force_move(entries),
- v5::properties{}
- );
- }
- catch (std::exception const& ex) {
- MQTT_LOG("mqtt_broker", error)
- << MQTT_ADD_VALUE(address, p)
- << ex.what();
- return true;
- }
- }
- );
- ep.set_v5_unsubscribe_handler(
- [this, wp]
- (packet_id_t packet_id,
- std::vector<unsubscribe_entry> entries,
- v5::properties props
- ) {
- con_sp_t sp = wp.lock();
- BOOST_ASSERT(sp);
- auto p = sp.get();
- try {
- return unsubscribe_handler(
- force_move(sp),
- packet_id,
- force_move(entries),
- force_move(props)
- );
- }
- catch (std::exception const& ex) {
- MQTT_LOG("mqtt_broker", error)
- << MQTT_ADD_VALUE(address, p)
- << ex.what();
- return true;
- }
- }
- );
- ep.set_pingreq_handler(
- [this, wp] {
- con_sp_t sp = wp.lock();
- BOOST_ASSERT(sp);
- if (pingresp_) {
- auto p = sp.get();
- p->async_pingresp(
- [sp = force_move(sp)]
- (error_code ec) {
- if (ec) {
- MQTT_LOG("mqtt_broker", info)
- << MQTT_ADD_VALUE(address, sp.get())
- << ec.message();
- }
- }
- );
- }
- return true;
- }
- );
- ep.set_v5_auth_handler(
- [this]
- (v5::auth_reason_code /*reason_code*/,
- v5::properties props
- ) {
- if (h_auth_props_) h_auth_props_(force_move(props));
- return true;
- }
- );
- // Pass spep to keep lifetime.
- // It makes sure wp.lock() never return nullptr in the handlers below
- // including close_handler and error_handler.
- ep.start_session(spep);
- }
- void set_connack_props(v5::properties props) {
- connack_props_ = force_move(props);
- }
- void set_suback_props(v5::properties props) {
- suback_props_ = force_move(props);
- }
- void set_unsuback_props(v5::properties props) {
- unsuback_props_ = force_move(props);
- }
- void set_puback_props(v5::properties props) {
- puback_props_ = force_move(props);
- }
- void set_pubrec_props(v5::properties props) {
- pubrec_props_ = force_move(props);
- }
- void set_pubrel_props(v5::properties props) {
- pubrel_props_ = force_move(props);
- }
- void set_pubcomp_props(v5::properties props) {
- pubcomp_props_ = force_move(props);
- }
- void set_connect_props_handler(std::function<void(v5::properties const&)> h) {
- h_connect_props_ = force_move(h);
- }
- void set_disconnect_props_handler(std::function<void(v5::properties const&)> h) {
- h_disconnect_props_ = force_move(h);
- }
- void set_publish_props_handler(std::function<void(v5::properties const&)> h) {
- h_publish_props_ = force_move(h);
- }
- void set_puback_props_handler(std::function<void(v5::properties const&)> h) {
- h_puback_props_ = force_move(h);
- }
- void set_pubrec_props_handler(std::function<void(v5::properties const&)> h) {
- h_pubrec_props_ = force_move(h);
- }
- void set_pubrel_props_handler(std::function<void(v5::properties const&)> h) {
- h_pubrel_props_ = force_move(h);
- }
- void set_pubcomp_props_handler(std::function<void(v5::properties const&)> h) {
- h_pubcomp_props_ = force_move(h);
- }
- void set_subscribe_props_handler(std::function<void(v5::properties const&)> h) {
- h_subscribe_props_ = force_move(h);
- }
- void set_unsubscribe_props_handler(std::function<void(v5::properties const&)> h) {
- h_unsubscribe_props_ = force_move(h);
- }
- void set_auth_props_handler(std::function<void(v5::properties const&)> h) {
- h_auth_props_ = force_move(h);
- }
- void clear_all_sessions() {
- std::lock_guard<mutex> g(mtx_sessions_);
- sessions_.clear();
- }
- void clear_all_retained_topics() {
- std::lock_guard<mutex> g(mtx_retains_);
- retains_.clear();
- }
- private:
- static void force_disconnect(con_sp_t spep) {
- auto p = spep.get();
- p->async_force_disconnect(
- [spep = force_move(spep)]
- (error_code ec) {
- if (ec) {
- MQTT_LOG("mqtt_broker", info)
- << MQTT_ADD_VALUE(address, spep.get())
- << ec.message();
- }
- }
- );
- };
- static void disconnect_and_force_disconnect(con_sp_t spep, v5::disconnect_reason_code rc) {
- auto p = spep.get();
- p->async_disconnect(
- rc,
- v5::properties{},
- [spep = force_move(spep)]
- (error_code) mutable {
- force_disconnect(force_move(spep));
- }
- );
- };
- /**
- * @brief connect_proc Process an incoming CONNECT packet
- *
- * This is called by the connect_handler function, which is registered
- * on mqtt connections where the raw transport (tcp / tls / websocket / etc)
- * is established, but the CONNECT message has not been sent / received by
- * the mqtt client on the other end of the connection.
- *
- * When the CONNECT message is received, this function is called after some
- * basic pre-connection logic, to setup the record keeping that this broker
- * class needs to handle the connection and process subscriptions and publishing.
- *
- * @param clean_start - if the clean-start flag is set on the CONNECT message.
- * @param spep - varient of shared pointers to underlying connection type.
- * @param req_client_id - the id that the client wants to use (username will be prepended)
- * @param will - the last-will-and-testiment of the connection, if any.
- */
- bool connect_handler(
- con_sp_t spep,
- buffer client_id,
- optional<buffer> noauth_username,
- optional<buffer> password,
- optional<will> will,
- bool clean_start,
- std::uint16_t /*keep_alive*/,
- v5::properties props
- ) {
- auto& ep = *spep;
- optional<std::string> username;
- if (ep.get_preauthed_user_name()) {
- if (security.login_cert(ep.get_preauthed_user_name().value())) {
- username = ep.get_preauthed_user_name();
- }
- }
- else if (!noauth_username && !password) {
- username = security.login_anonymous();
- }
- else if (noauth_username && password) {
- username = security.login(*noauth_username, *password);
- }
- // If login fails, try the unauthenticated user
- if (!username) username = security.login_unauthenticated();
- v5::properties connack_props;
- connect_param cp = handle_connect_props(ep, props, will);
- if (!username) {
- MQTT_LOG("mqtt_broker", trace)
- << MQTT_ADD_VALUE(address, this)
- << "User failed to login: "
- << (noauth_username ? std::string(*noauth_username) : std::string("anonymous user"));
- send_connack(
- ep,
- false, // session present
- false, // authenticated
- force_move(connack_props),
- [spep](error_code) {
- disconnect_and_force_disconnect(spep, v5::disconnect_reason_code::not_authorized);
- }
- );
- return true;
- }
- if (client_id.empty()) {
- if (!handle_empty_client_id(spep, client_id, clean_start, connack_props)) {
- return false;
- }
- // A new client id was generated
- client_id = buffer(string_view(spep->get_client_id()));
- }
- MQTT_LOG("mqtt_broker", trace)
- << MQTT_ADD_VALUE(address, this)
- << "User logged in as: '" << *username << "', client_id: " << client_id;
- /**
- * http://docs.oasis-open.org/mqtt/mqtt/v5.0/cs02/mqtt-v5.0-cs02.html#_Toc514345311
- * 3.1.2.4 Clean Start
- * If a CONNECT packet is received with Clean Start is set to 1, the Client and Server MUST
- * discard any existing Session and start a new Session [MQTT-3.1.2-4]. Consequently,
- * the Session Present flag in CONNACK is always set to 0 if Clean Start is set to 1.
- */
- // Find any sessions that have the same client_id
- std::lock_guard<mutex> g(mtx_sessions_);
- auto& idx = sessions_.get<tag_cid>();
- auto it = idx.lower_bound(std::make_tuple(*username, client_id));
- if (it == idx.end() ||
- it->client_id() != client_id ||
- it->get_username() != *username
- ) {
- // new connection
- MQTT_LOG("mqtt_broker", trace)
- << MQTT_ADD_VALUE(address, this)
- << "cid:" << client_id
- << " new connection inserted.";
- it = idx.emplace_hint(
- it,
- timer_ioc_,
- mtx_subs_map_,
- subs_map_,
- shared_targets_,
- spep,
- client_id,
- *username,
- force_move(will),
- // will_sender
- [this](auto&&... params) {
- do_publish(std::forward<decltype(params)>(params)...);
- },
- force_move(cp.will_expiry_interval),
- force_move(cp.session_expiry_interval)
- );
- if (cp.response_topic_requested) {
- // set_response_topic never modify key part
- set_response_topic(const_cast<session_state&>(*it), connack_props, *username);
- }
- send_connack(
- ep,
- false, // session present
- true, // authenticated
- force_move(connack_props)
- );
- }
- else if (it->online()) {
- // online overwrite
- if (close_proc_no_lock(it->con(), true, v5::disconnect_reason_code::session_taken_over)) {
- // remain offline
- if (clean_start) {
- // discard offline session
- MQTT_LOG("mqtt_broker", trace)
- << MQTT_ADD_VALUE(address, this)
- << "cid:" << client_id
- << "online connection exists, discard old one due to new one's clean_start and renew";
- if (cp.response_topic_requested) {
- // set_response_topic never modify key part
- set_response_topic(const_cast<session_state&>(*it), connack_props, *username);
- }
- send_connack(
- ep,
- false, // session present
- true, // authenticated
- force_move(connack_props)
- );
- idx.modify(
- it,
- [&](auto& e) {
- e.clean();
- e.update_will(timer_ioc_, force_move(will), cp.will_expiry_interval);
- e.set_username(*username);
- // renew_session_expiry updates index
- e.renew_session_expiry(force_move(cp.session_expiry_interval));
- },
- [](auto&) { BOOST_ASSERT(false); }
- );
- }
- else {
- // inherit online session if previous session's session exists
- MQTT_LOG("mqtt_broker", trace)
- << MQTT_ADD_VALUE(address, this)
- << "cid:" << client_id
- << "online connection exists, inherit old one and renew";
- if (cp.response_topic_requested) {
- // set_response_topic never modify key part
- set_response_topic(const_cast<session_state&>(*it), connack_props, *username);
- }
- send_connack(
- ep,
- true, // session present
- true, // authenticated
- force_move(connack_props),
- [
- this,
- &idx,
- it,
- will = force_move(will),
- clean_start,
- spep,
- will_expiry_interval = cp.will_expiry_interval,
- session_expiry_interval = cp.session_expiry_interval,
- username
- ]
- (error_code ec) mutable {
- if (ec) {
- MQTT_LOG("mqtt_broker", trace)
- << MQTT_ADD_VALUE(address, this)
- << ec.message();
- return;
- }
- idx.modify(
- it,
- [&](auto& e) {
- e.renew(spep, clean_start);
- e.set_username(*username);
- e.update_will(timer_ioc_, force_move(will), will_expiry_interval);
- // renew_session_expiry updates index
- e.renew_session_expiry(force_move(session_expiry_interval));
- e.send_inflight_messages();
- e.send_all_offline_messages();
- },
- [](auto&) { BOOST_ASSERT(false); }
- );
- }
- );
- }
- }
- else {
- // new connection
- MQTT_LOG("mqtt_broker", trace)
- << MQTT_ADD_VALUE(address, this)
- << "cid:" << client_id
- << "online connection exists, discard old one due to session_expiry and renew";
- bool inserted;
- std::tie(it, inserted) = idx.emplace(
- timer_ioc_,
- mtx_subs_map_,
- subs_map_,
- shared_targets_,
- spep,
- client_id,
- *username,
- force_move(will),
- // will_sender
- [this](auto&&... params) {
- do_publish(std::forward<decltype(params)>(params)...);
- },
- force_move(cp.will_expiry_interval),
- force_move(cp.session_expiry_interval)
- );
- BOOST_ASSERT(inserted);
- if (cp.response_topic_requested) {
- // set_response_topic never modify key part
- set_response_topic(const_cast<session_state&>(*it), connack_props, *username);
- }
- send_connack(
- ep,
- false, // session present
- true, // authenticated
- force_move(connack_props)
- );
- }
- }
- else {
- // offline -> online
- if (clean_start) {
- // discard offline session
- MQTT_LOG("mqtt_broker", trace)
- << MQTT_ADD_VALUE(address, this)
- << "cid:" << client_id
- << "offline connection exists, discard old one due to new one's clean_start and renew";
- if (cp.response_topic_requested) {
- // set_response_topic never modify key part
- set_response_topic(const_cast<session_state&>(*it), connack_props, *username);
- }
- send_connack(
- ep,
- false, // session present
- true, // authenticated
- force_move(connack_props)
- );
- idx.modify(
- it,
- [&](auto& e) {
- e.clean();
- e.renew(spep, clean_start);
- e.update_will(timer_ioc_, force_move(will), cp.will_expiry_interval);
- e.set_username(*username);
- // renew_session_expiry updates index
- e.renew_session_expiry(force_move(cp.session_expiry_interval));
- },
- [](auto&) { BOOST_ASSERT(false); }
- );
- }
- else {
- // inherit offline session
- MQTT_LOG("mqtt_broker", trace)
- << MQTT_ADD_VALUE(address, this)
- << "cid:" << client_id
- << "offline connection exists, inherit old one and renew";
- if (cp.response_topic_requested) {
- // set_response_topic never modify key part
- set_response_topic(const_cast<session_state&>(*it), connack_props, *username);
- }
- send_connack(
- ep,
- true, // session present
- true, // authenticated
- force_move(connack_props),
- [
- this,
- &idx,
- it,
- will = force_move(will),
- clean_start,
- spep,
- will_expiry_interval = cp.will_expiry_interval,
- session_expiry_interval = cp.session_expiry_interval,
- username
- ]
- (error_code ec) mutable {
- if (ec) {
- MQTT_LOG("mqtt_broker", trace)
- << MQTT_ADD_VALUE(address, this)
- << ec.message();
- return;
- }
- idx.modify(
- it,
- [&](auto& e) {
- e.renew(spep, clean_start);
- e.set_username(*username);
- e.update_will(timer_ioc_, force_move(will), will_expiry_interval);
- // renew_session_expiry updates index
- e.renew_session_expiry(force_move(session_expiry_interval));
- e.send_inflight_messages();
- e.send_all_offline_messages();
- },
- [](auto&) { BOOST_ASSERT(false); }
- );
- }
- );
- }
- }
- return true;
- }
- struct connect_param {
- optional<std::chrono::steady_clock::duration> session_expiry_interval;
- optional<std::chrono::steady_clock::duration> will_expiry_interval;
- bool response_topic_requested = false;
- };
- connect_param handle_connect_props(
- endpoint_t& ep,
- v5::properties const& props,
- optional<will> const& will
- ) {
- connect_param cp;
- if (ep.get_protocol_version() == protocol_version::v5) {
- {
- auto v = get_property<v5::property::session_expiry_interval>(props);
- if (v && v.value().val() != 0) {
- cp.session_expiry_interval.emplace(std::chrono::seconds(v.value().val()));
- }
- }
- {
- auto v = get_property<v5::property::request_response_information>(props);
- if (v && v.value().val() == 1) {
- cp.response_topic_requested = true;
- }
- }
- if (will) {
- auto v = get_property<v5::property::message_expiry_interval>(will.value().props());
- if (v) {
- cp.will_expiry_interval.emplace(std::chrono::seconds(v.value().val()));
- }
- }
- if (h_connect_props_) {
- h_connect_props_(props);
- }
- }
- return cp;
- }
- void send_connack(
- endpoint_t& ep,
- bool session_present,
- bool authenticated,
- v5::properties props,
- std::function<void(error_code)> finish = [](error_code){}
- ) {
- // Reply to the connect message.
- switch (ep.get_protocol_version()) {
- case protocol_version::v3_1_1:
- if (connack_) ep.async_connack(
- session_present,
- authenticated ? connect_return_code::accepted
- : connect_return_code::not_authorized,
- [finish = force_move(finish)]
- (error_code ec) {
- finish(ec);
- }
- );
- break;
- case protocol_version::v5:
- // connack_props_ member varible is for testing
- if (connack_props_.empty()) {
- // props local variable is is for real case
- props.emplace_back(v5::property::topic_alias_maximum{topic_alias_max});
- props.emplace_back(v5::property::receive_maximum{receive_maximum_max});
- if (connack_) ep.async_connack(
- session_present,
- authenticated ? v5::connect_reason_code::success
- : v5::connect_reason_code::not_authorized,
- force_move(props),
- [finish = force_move(finish)]
- (error_code ec) {
- finish(ec);
- }
- );
- }
- else {
- // use connack_props_ for testing
- if (connack_) ep.async_connack(
- session_present,
- authenticated ? v5::connect_reason_code::success
- : v5::connect_reason_code::not_authorized,
- connack_props_,
- [finish = force_move(finish)]
- (error_code ec) {
- finish(ec);
- }
- );
- }
- break;
- default:
- BOOST_ASSERT(false);
- break;
- }
- }
- void remove_rule(std::size_t rule_nr) {
- security.remove_auth(rule_nr);
- }
- void set_response_topic(session_state& s, v5::properties& connack_props, std::string const &username) {
- auto response_topic =
- [&] {
- if (auto rt_opt = s.get_response_topic()) {
- return rt_opt.value();
- }
- auto rt = create_uuid_string();
- s.set_response_topic(rt);
- return rt;
- } ();
- auto rule_nr = security.add_auth(
- response_topic,
- { "@any" }, MQTT_NS::broker::security::authorization::type::allow,
- { username }, MQTT_NS::broker::security::authorization::type::allow
- );
- s.set_clean_handler(
- [this, response_topic, rule_nr]() {
- std::lock_guard<mutex> g(mtx_retains_);
- retains_.erase(response_topic);
- remove_rule(rule_nr);
- }
- );
- connack_props.emplace_back(
- v5::property::response_topic(
- allocate_buffer(response_topic)
- )
- );
- }
- bool handle_empty_client_id(
- con_sp_t spep,
- buffer const& client_id,
- bool clean_start,
- v5::properties& connack_props
- ) {
- auto& ep = *spep;
- switch (ep.get_protocol_version()) {
- case protocol_version::v3_1_1:
- if (client_id.empty()) {
- if (clean_start) {
- ep.set_client_id(create_uuid_string());
- }
- else {
- // https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349242
- // If the Client supplies a zero-byte ClientId,
- // the Client MUST also set CleanSession to 1 [MQTT-3.1.3-7].
- // If it's a not a clean session, but no client id is provided,
- // we would have no way to map this connection's session to a new connection later.
- // So the connection must be rejected.
- if (connack_) {
- ep.async_connack(
- false,
- connect_return_code::identifier_rejected,
- [&ep, spep = force_move(spep)]
- (error_code ec) mutable {
- if (ec) {
- MQTT_LOG("mqtt_broker", info)
- << MQTT_ADD_VALUE(address, spep.get())
- << ec.message();
- }
- ep.async_force_disconnect(
- [spep = force_move(spep)]
- (error_code ec) {
- if (ec) {
- MQTT_LOG("mqtt_broker", info)
- << MQTT_ADD_VALUE(address, spep.get())
- << ec.message();
- }
- }
- );
- }
- );
- }
- return false;
- }
- }
- break;
- case protocol_version::v5:
- if (client_id.empty()) {
- // https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901059
- // A Server MAY allow a Client to supply a ClientID that has a length of zero bytes,
- // however if it does so the Server MUST treat this as a special case and assign a
- // unique ClientID to that Client [MQTT-3.1.3-6]. It MUST then process the
- // CONNECT packet as if the Client had provided that unique ClientID,
- // and MUST return the Assigned Client Identifier in the CONNACK packet [MQTT-3.1.3-7].
- // If the Server rejects the ClientID it MAY respond to the CONNECT packet with a CONNACK
- // using Reason Code 0x85 (Client Identifier not valid) as described in section 4.13
- // Handling errors, and then it MUST close the Network Connection [MQTT-3.1.3-8].
- //
- // mqtt_cpp author's note: On v5.0, no Clean Start restriction is described.
- ep.set_client_id(create_uuid_string());
- connack_props.emplace_back(
- v5::property::assigned_client_identifier(buffer(string_view(ep.get_client_id())))
- );
- }
- break;
- default:
- BOOST_ASSERT(false);
- return false;
- }
- return true;
- }
- void disconnect_handler(
- con_sp_t spep
- ) {
- if (delay_disconnect_) {
- tim_disconnect_.expires_after(delay_disconnect_.value());
- tim_disconnect_.wait();
- }
- close_proc(force_move(spep), false);
- }
- /**
- * @brief close_proc_no_lock - clean up a connection that has been closed.
- *
- * @param ep - The underlying server (of whichever type) that is disconnecting.
- * @param send_will - Whether to publish this connections last will
- * @return true if offline session is remained, otherwise false
- */
- // TODO: Maybe change the name of this function.
- bool close_proc_no_lock(
- con_sp_t spep,
- bool send_will,
- optional<v5::disconnect_reason_code> rc) {
- endpoint_t& ep = *spep;
- auto& idx = sessions_.get<tag_con>();
- auto it = idx.find(spep);
- // act_sess_it == act_sess_idx.end() could happen if broker accepts
- // the session from client but the client closes the session before sending
- // MQTT `CONNECT` message.
- // In this case, do nothing is correct behavior.
- if (it == idx.end()) return false;
- bool session_clear =
- [&] {
- if (ep.get_protocol_version() == protocol_version::v3_1_1) {
- return ep.clean_session();
- }
- else {
- BOOST_ASSERT(ep.get_protocol_version() == protocol_version::v5);
- auto const& sei_opt = it->session_expiry_interval();
- return !sei_opt || sei_opt.value() == std::chrono::steady_clock::duration::zero();
- }
- } ();
- auto do_send_will =
- [&](session_state& ss) {
- if (send_will) {
- ss.send_will();
- }
- else {
- ss.clear_will();
- }
- };
- if (session_clear) {
- // const_cast is appropriate here
- // See https://github.com/boostorg/multi_index/issues/50
- auto& ss = const_cast<session_state&>(*it);
- do_send_will(ss);
- if (rc) {
- MQTT_LOG("mqtt_broker", trace)
- << MQTT_ADD_VALUE(address, spep.get())
- << "disconnect_and_force_disconnect(async) cid:" << ss.client_id();
- disconnect_and_force_disconnect(spep, rc.value());
- }
- else {
- MQTT_LOG("mqtt_broker", trace)
- << MQTT_ADD_VALUE(address, spep.get())
- << "force_disconnect(async) cid:" << ss.client_id();
- force_disconnect(spep);
- }
- idx.erase(it);
- BOOST_ASSERT(sessions_.get<tag_con>().find(spep) == sessions_.get<tag_con>().end());
- return false;
- }
- else {
- idx.modify(
- it,
- [&](session_state& ss) {
- do_send_will(ss);
- if (rc) {
- MQTT_LOG("mqtt_broker", trace)
- << MQTT_ADD_VALUE(address, spep.get())
- << "disconnect_and_force_disconnect(async) cid:" << ss.client_id();
- disconnect_and_force_disconnect(spep, rc.value());
- }
- else {
- MQTT_LOG("mqtt_broker", trace)
- << MQTT_ADD_VALUE(address, spep.get())
- << "force_disconnect(async) cid:" << ss.client_id();
- force_disconnect(spep);
- }
- // become_offline updates index
- ss.become_offline(
- [this]
- (std::shared_ptr<as::steady_timer> const& sp_tim) {
- sessions_.get<tag_tim>().erase(sp_tim);
- }
- );
- },
- [](auto&) { BOOST_ASSERT(false); }
- );
- return true;
- }
- }
- /**
- * @brief close_proc - clean up a connection that has been closed.
- *
- * @param ep - The underlying server (of whichever type) that is disconnecting.
- * @param send_will - Whether to publish this connections last will
- * @param rc - Reason Code for send pack DISCONNECT
- * @return true if offline session is remained, otherwise false
- */
- // TODO: Maybe change the name of this function.
- bool close_proc(
- con_sp_t spep,
- bool send_will,
- optional<v5::disconnect_reason_code> rc = nullopt
- ) {
- std::lock_guard<mutex> g(mtx_sessions_);
- return close_proc_no_lock(force_move(spep), send_will, rc);
- }
- bool publish_handler(
- con_sp_t spep,
- optional<packet_id_t> packet_id,
- publish_options pubopts,
- buffer topic_name,
- buffer contents,
- v5::properties props) {
- auto& ep = *spep;
- std::shared_lock<mutex> g(mtx_sessions_);
- auto& idx = sessions_.get<tag_con>();
- auto it = idx.find(spep);
- // broker uses async_* APIs
- // If broker erase a connection, then async_force_disconnect()
- // and/or async_force_disconnect () is called.
- // During async operation, spep is valid but it has already been
- // erased from sessions_
- if (it == idx.end()) return true;
- auto send_pubres =
- [&] (bool authorized = true) {
- switch (pubopts.get_qos()) {
- case qos::at_least_once:
- ep.async_puback(
- packet_id.value(),
- authorized ? v5::puback_reason_code::success
- : v5::puback_reason_code::not_authorized,
- puback_props_,
- [spep = force_move(spep)]
- (error_code ec) {
- if (ec) {
- MQTT_LOG("mqtt_broker", info)
- << MQTT_ADD_VALUE(address, spep.get())
- << ec.message();
- }
- }
- );
- break;
- case qos::exactly_once: {
- ep.async_pubrec(
- packet_id.value(),
- authorized ? v5::pubrec_reason_code::success
- : v5::pubrec_reason_code::not_authorized,
- pubrec_props_,
- [spep = force_move(spep)]
- (error_code ec) {
- if (ec) {
- MQTT_LOG("mqtt_broker", info)
- << MQTT_ADD_VALUE(address, spep.get())
- << ec.message();
- }
- }
- );
- } break;
- default:
- break;
- }
- };
- // See if this session is authorized to publish this topic
- if (security.auth_pub(topic_name, it->get_username()) != security::authorization::type::allow) {
- // Publish not authorized
- send_pubres(false);
- return true;
- }
- v5::properties forward_props;
- for (auto&& p : props) {
- MQTT_NS::visit(
- make_lambda_visitor(
- [](v5::property::topic_alias&&) {
- // TopicAlias is not forwarded
- // https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901113
- // A receiver MUST NOT carry forward any Topic Alias mappings from
- // one Network Connection to another [MQTT-3.3.2-7].
- },
- [&ep](v5::property::subscription_identifier&& p) {
- MQTT_LOG("mqtt_broker", warning)
- << MQTT_ADD_VALUE(address, &ep)
- << "Subscription Identifier from client not forwarded sid:" << p.val();
- },
- [&forward_props](auto&& p) {
- forward_props.push_back(force_move(p));
- }
- ),
- force_move(p)
- );
- }
- do_publish(
- *it,
- force_move(topic_name),
- force_move(contents),
- pubopts.get_qos() | pubopts.get_retain(), // remove dup flag
- force_move(forward_props)
- );
- send_pubres();
- return true;
- }
- bool puback_handler(
- con_sp_t spep,
- packet_id_t packet_id,
- v5::puback_reason_code /*reason_code*/,
- v5::properties /*props*/) {
- std::shared_lock<mutex> g(mtx_sessions_);
- auto& idx = sessions_.get<tag_con>();
- auto it = idx.find(spep);
- // broker uses async_* APIs
- // If broker erase a connection, then async_force_disconnect()
- // and/or async_force_disconnect () is called.
- // During async operation, spep is valid but it has already been
- // erased from sessions_
- if (it == idx.end()) return true;
- // const_cast is appropriate here
- // See https://github.com/boostorg/multi_index/issues/50
- auto& ss = const_cast<session_state&>(*it);
- ss.erase_inflight_message_by_packet_id(packet_id);
- ss.send_offline_messages_by_packet_id_release();
- return true;
- }
- bool pubrec_handler(
- con_sp_t spep,
- packet_id_t packet_id,
- v5::pubrec_reason_code reason_code,
- v5::properties /*props*/) {
- std::shared_lock<mutex> g(mtx_sessions_);
- auto& idx = sessions_.get<tag_con>();
- auto it = idx.find(spep);
- // broker uses async_* APIs
- // If broker erase a connection, then async_force_disconnect()
- // and/or async_force_disconnect () is called.
- // During async operation, spep is valid but it has already been
- // erased from sessions_
- if (it == idx.end()) return true;
- // const_cast is appropriate here
- // See https://github.com/boostorg/multi_index/issues/50
- auto& ss = const_cast<session_state&>(*it);
- ss.erase_inflight_message_by_packet_id(packet_id);
- if (is_error(reason_code)) return true;
- auto& ep = *spep;
- switch (ep.get_protocol_version()) {
- case protocol_version::v3_1_1:
- ep.async_pubrel(
- packet_id,
- [spep = force_move(spep)]
- (error_code ec) {
- if (ec) {
- MQTT_LOG("mqtt_broker", info)
- << MQTT_ADD_VALUE(address, spep.get())
- << ec.message();
- }
- }
- );
- break;
- case protocol_version::v5:
- ep.async_pubrel(
- packet_id,
- v5::pubrel_reason_code::success,
- pubrel_props_,
- [spep = force_move(spep)]
- (error_code ec) {
- if (ec) {
- MQTT_LOG("mqtt_broker", info)
- << MQTT_ADD_VALUE(address, spep.get())
- << ec.message();
- }
- }
- );
- break;
- default:
- BOOST_ASSERT(false);
- break;
- }
- return true;
- }
- bool pubrel_handler(
- con_sp_t spep,
- packet_id_t packet_id,
- v5::pubrel_reason_code reason_code,
- v5::properties /*props*/) {
- std::shared_lock<mutex> g(mtx_sessions_);
- auto& idx = sessions_.get<tag_con>();
- auto it = idx.find(spep);
- // broker uses async_* APIs
- // If broker erase a connection, then async_force_disconnect()
- // and/or async_force_disconnect () is called.
- // During async operation, spep is valid but it has already been
- // erased from sessions_
- if (it == idx.end()) return true;
- auto& ep = *spep;
- switch (ep.get_protocol_version()) {
- case protocol_version::v3_1_1:
- ep.async_pubcomp(
- packet_id,
- [spep = force_move(spep)]
- (error_code ec) {
- if (ec) {
- MQTT_LOG("mqtt_broker", info)
- << MQTT_ADD_VALUE(address, spep.get())
- << ec.message();
- }
- }
- );
- break;
- case protocol_version::v5:
- ep.async_pubcomp(
- packet_id,
- // pubcomp reason code is the same as pubrel one
- static_cast<v5::pubcomp_reason_code>(reason_code),
- pubcomp_props_,
- [spep = force_move(spep)]
- (error_code ec) {
- if (ec) {
- MQTT_LOG("mqtt_broker", info)
- << MQTT_ADD_VALUE(address, spep.get())
- << ec.message();
- }
- }
- );
- break;
- default:
- BOOST_ASSERT(false);
- break;
- }
- return true;
- }
- bool pubcomp_handler(
- con_sp_t spep,
- packet_id_t packet_id,
- v5::pubcomp_reason_code /*reason_code*/,
- v5::properties /*props*/){
- std::shared_lock<mutex> g(mtx_sessions_);
- auto& idx = sessions_.get<tag_con>();
- auto it = idx.find(spep);
- // broker uses async_* APIs
- // If broker erase a connection, then async_force_disconnect()
- // and/or async_force_disconnect () is called.
- // During async operation, spep is valid but it has already been
- // erased from sessions_
- if (it == idx.end()) return true;
- // const_cast is appropriate here
- // See https://github.com/boostorg/multi_index/issues/50
- auto& ss = const_cast<session_state&>(*it);
- ss.erase_inflight_message_by_packet_id(packet_id);
- ss.send_offline_messages_by_packet_id_release();
- return true;
- }
- bool subscribe_handler(
- con_sp_t spep,
- packet_id_t packet_id,
- std::vector<subscribe_entry> entries,
- v5::properties props) {
- auto& ep = *spep;
- std::shared_lock<mutex> g(mtx_sessions_);
- auto& idx = sessions_.get<tag_con>();
- auto it = idx.find(spep);
- // broker uses async_* APIs
- // If broker erase a connection, then async_force_disconnect()
- // and/or async_force_disconnect () is called.
- // During async operation, spep is valid but it has already been
- // erased from sessions_
- if (it == idx.end()) return true;
- // The element of sessions_ must have longer lifetime
- // than corresponding subscription.
- // Because the subscription store the reference of the element.
- optional<session_state_ref> ssr_opt;
- // const_cast is appropriate here
- // See https://github.com/boostorg/multi_index/issues/50
- auto& ss = const_cast<session_state&>(*it);
- ssr_opt.emplace(ss);
- BOOST_ASSERT(ssr_opt);
- session_state_ref ssr {ssr_opt.value()};
- auto publish_proc =
- [this, &ssr](retain_t const& r, qos qos_value, optional<std::size_t> sid) {
- auto props = r.props;
- if (sid) {
- props.push_back(v5::property::subscription_identifier(*sid));
- }
- if (r.tim_message_expiry) {
- auto d =
- std::chrono::duration_cast<std::chrono::seconds>(
- r.tim_message_expiry->expiry() - std::chrono::steady_clock::now()
- ).count();
- set_property<v5::property::message_expiry_interval>(
- props,
- v5::property::message_expiry_interval(
- static_cast<uint32_t>(d)
- )
- );
- }
- ssr.get().publish(
- timer_ioc_,
- r.topic,
- r.contents,
- std::min(r.qos_value, qos_value) | MQTT_NS::retain::yes,
- props
- );
- };
- std::vector<std::function<void()>> retain_deliver;
- retain_deliver.reserve(entries.size());
- // subscription identifier
- optional<std::size_t> sid;
- // An in-order list of qos settings, used to send the reply.
- // The MQTT protocol 3.1.1 - 3.8.4 Response - paragraph 6
- // allows the server to grant a lower QOS than requested
- // So we reply with the QOS setting that was granted
- // not the one requested.
- switch (ep.get_protocol_version()) {
- case protocol_version::v3_1_1: {
- std::vector<suback_return_code> res;
- res.reserve(entries.size());
- for (auto& e : entries) {
- if (security.is_subscribe_authorized(ss.get_username(), e.topic_filter)) {
- res.emplace_back(qos_to_suback_return_code(e.subopts.get_qos())); // converts to granted_qos_x
- ssr.get().subscribe(
- force_move(e.share_name),
- e.topic_filter,
- e.subopts,
- [&] {
- std::shared_lock<mutex> g(mtx_retains_);
- retains_.find(
- e.topic_filter,
- [&](retain_t const& r) {
- retain_deliver.emplace_back(
- [&publish_proc, &r, qos_value = e.subopts.get_qos(), sid] {
- publish_proc(r, qos_value, sid);
- }
- );
- }
- );
- }
- );
- }
- else {
- // User not authorized to subscribe to topic filter
- res.emplace_back(suback_return_code::failure);
- }
- }
- // Acknowledge the subscriptions, and the registered QOS settings
- ep.async_suback(
- packet_id,
- force_move(res),
- [spep = force_move(spep)]
- (error_code ec) {
- if (ec) {
- MQTT_LOG("mqtt_broker", info)
- << MQTT_ADD_VALUE(address, spep.get())
- << ec.message();
- }
- }
- );
- } break;
- case protocol_version::v5: {
- // Get subscription identifier
- auto v = get_property<v5::property::subscription_identifier>(props);
- if (v && v.value().val() != 0) {
- sid.emplace(v.value().val());
- }
- std::vector<v5::suback_reason_code> res;
- res.reserve(entries.size());
- for (auto& e : entries) {
- if (security.is_subscribe_authorized(ss.get_username(), e.topic_filter)) {
- res.emplace_back(v5::qos_to_suback_reason_code(e.subopts.get_qos())); // converts to granted_qos_x
- ssr.get().subscribe(
- force_move(e.share_name),
- e.topic_filter,
- e.subopts,
- [&] {
- std::shared_lock<mutex> g(mtx_retains_);
- retains_.find(
- e.topic_filter,
- [&](retain_t const& r) {
- retain_deliver.emplace_back(
- [&publish_proc, &r, qos_value = e.subopts.get_qos(), sid] {
- publish_proc(r, qos_value, sid);
- }
- );
- }
- );
- },
- sid
- );
- }
- else {
- // User not authorized to subscribe to topic filter
- res.emplace_back(v5::suback_reason_code::not_authorized);
- }
- }
- if (h_subscribe_props_) h_subscribe_props_(props);
- // Acknowledge the subscriptions, and the registered QOS settings
- ep.async_suback(
- packet_id,
- force_move(res),
- suback_props_,
- [spep = force_move(spep)]
- (error_code ec) {
- if (ec) {
- MQTT_LOG("mqtt_broker", info)
- << MQTT_ADD_VALUE(address, spep.get())
- << ec.message();
- }
- }
- );
- } break;
- default:
- BOOST_ASSERT(false);
- break;
- }
- for (auto const& f : retain_deliver) {
- f();
- }
- return true;
- }
- bool unsubscribe_handler(
- con_sp_t spep,
- packet_id_t packet_id,
- std::vector<unsubscribe_entry> entries,
- v5::properties props) {
- auto& ep = *spep;
- std::shared_lock<mutex> g(mtx_sessions_);
- auto& idx = sessions_.get<tag_con>();
- auto it = idx.find(spep);
- // broker uses async_* APIs
- // If broker erase a connection, then async_force_disconnect()
- // and/or async_force_disconnect () is called.
- // During async operation, spep is valid but it has already been
- // erased from sessions_
- if (it == idx.end()) return true;
- // The element of sessions_ must have longer lifetime
- // than corresponding subscription.
- // Because the subscription store the reference of the element.
- optional<session_state_ref> ssr_opt;
- // const_cast is appropriate here
- // See https://github.com/boostorg/multi_index/issues/50
- auto& ss = const_cast<session_state&>(*it);
- ssr_opt.emplace(ss);
- BOOST_ASSERT(ssr_opt);
- session_state_ref ssr {ssr_opt.value()};
- // For each subscription that this connection has
- // Compare against the list of topic filters, and remove
- // the subscription if the topic filter is in the list.
- for (auto const& e : entries) {
- ssr.get().unsubscribe(e.share_name, e.topic_filter);
- }
- switch (ep.get_protocol_version()) {
- case protocol_version::v3_1_1:
- ep.async_unsuback(
- packet_id,
- [spep = force_move(spep)]
- (error_code ec) {
- if (ec) {
- MQTT_LOG("mqtt_broker", info)
- << MQTT_ADD_VALUE(address, spep.get())
- << ec.message();
- }
- }
- );
- break;
- case protocol_version::v5:
- if (h_unsubscribe_props_) h_unsubscribe_props_(props);
- ep.async_unsuback(
- packet_id,
- std::vector<v5::unsuback_reason_code>(
- entries.size(),
- v5::unsuback_reason_code::success
- ),
- unsuback_props_,
- [spep = force_move(spep)]
- (error_code ec) {
- if (ec) {
- MQTT_LOG("mqtt_broker", info)
- << MQTT_ADD_VALUE(address, spep.get())
- << ec.message();
- }
- }
- );
- break;
- default:
- BOOST_ASSERT(false);
- break;
- }
- return true;
- }
- /**
- * @brief do_publish Publish a message to any subscribed clients.
- *
- * @param source_ss - soource session_state.
- * @param topic - The topic to publish the message on.
- * @param contents - The contents of the message.
- * @param pubopts - publish options
- * @param props - properties
- */
- void do_publish(
- session_state const& source_ss,
- buffer topic,
- buffer contents,
- publish_options pubopts,
- v5::properties props
- ) {
- // Get auth rights for this topic
- // auth_users prepared once here, and then referred multiple times in subs_map_.modify() for efficiency
- auto auth_users = security.auth_sub(topic);
- // publish the message to subscribers.
- // retain is delivered as the original only if rap_value is rap::retain.
- // On MQTT v3.1.1, rap_value is always rap::dont.
- auto deliver =
- [&] (session_state& ss, subscription& sub, auto const& auth_users) {
- // See if this session is authorized to subscribe this topic
- auto access = security.auth_sub_user(auth_users, ss.get_username());
- if (access != security::authorization::type::allow) return;
- publish_options new_pubopts = std::min(pubopts.get_qos(), sub.subopts.get_qos());
- if (sub.subopts.get_rap() == rap::retain && pubopts.get_retain() == MQTT_NS::retain::yes) {
- new_pubopts |= MQTT_NS::retain::yes;
- }
- if (sub.sid) {
- props.push_back(v5::property::subscription_identifier(sub.sid.value()));
- ss.deliver(
- timer_ioc_,
- topic,
- contents,
- new_pubopts,
- props
- );
- props.pop_back();
- }
- else {
- ss.deliver(
- timer_ioc_,
- topic,
- contents,
- new_pubopts,
- props
- );
- }
- };
- // share_name topic_filter
- std::set<std::tuple<string_view, string_view>> sent;
- {
- std::shared_lock<mutex> g{mtx_subs_map_};
- subs_map_.modify(
- topic,
- [&](buffer const& /*key*/, subscription& sub) {
- if (sub.share_name.empty()) {
- // Non shared subscriptions
- // If NL (no local) subscription option is set and
- // publisher is the same as subscriber, then skip it.
- if (sub.subopts.get_nl() == nl::yes &&
- sub.ss.get().client_id() == source_ss.client_id()) return;
- deliver(sub.ss.get(), sub, auth_users);
- }
- else {
- // Shared subscriptions
- bool inserted;
- std::tie(std::ignore, inserted) = sent.emplace(sub.share_name, sub.topic_filter);
- if (inserted) {
- if (auto ssr_opt = shared_targets_.get_target(sub.share_name, sub.topic_filter)) {
- deliver(ssr_opt.value().get(), sub, auth_users);
- }
- }
- }
- }
- );
- }
- optional<std::chrono::steady_clock::duration> message_expiry_interval;
- if (source_ss.get_protocol_version() == protocol_version::v5) {
- auto v = get_property<v5::property::message_expiry_interval>(props);
- if (v) {
- message_expiry_interval.emplace(std::chrono::seconds(v.value().val()));
- }
- }
- /*
- * If the message is marked as being retained, then we
- * keep it in case a new subscription is added that matches
- * this topic.
- *
- * @note: The MQTT standard 3.3.1.3 RETAIN makes it clear that
- * retained messages are global based on the topic, and
- * are not scoped by the client id. So any client may
- * publish a retained message on any topic, and the most
- * recently published retained message on a particular
- * topic is the message that is stored on the server.
- *
- * @note: The standard doesn't make it clear that publishing
- * a message with zero length, but the retain flag not
- * set, does not result in any existing retained message
- * being removed. However, internet searching indicates
- * that most brokers have opted to keep retained messages
- * when receiving contents of zero bytes, unless the so
- * received message has the retain flag set, in which case
- * the retained message is removed.
- */
- if (pubopts.get_retain() == MQTT_NS::retain::yes) {
- if (contents.empty()) {
- std::lock_guard<mutex> g(mtx_retains_);
- retains_.erase(topic);
- }
- else {
- std::shared_ptr<as::steady_timer> tim_message_expiry;
- if (message_expiry_interval) {
- tim_message_expiry = std::make_shared<as::steady_timer>(timer_ioc_, message_expiry_interval.value());
- tim_message_expiry->async_wait(
- [this, topic = topic, wp = std::weak_ptr<as::steady_timer>(tim_message_expiry)]
- (boost::system::error_code const& ec) {
- if (auto sp = wp.lock()) {
- if (!ec) {
- retains_.erase(topic);
- }
- }
- }
- );
- }
- std::lock_guard<mutex> g(mtx_retains_);
- retains_.insert_or_assign(
- topic,
- retain_t {
- force_move(topic),
- force_move(contents),
- force_move(props),
- pubopts.get_qos(),
- tim_message_expiry
- }
- );
- }
- }
- }
- private:
- as::io_context& timer_ioc_; ///< The boost asio context to run this broker on.
- as::steady_timer tim_disconnect_; ///< Used to delay disconnect handling for testing
- optional<std::chrono::steady_clock::duration> delay_disconnect_; ///< Used to delay disconnect handling for testing
- // Authorization and authentication settings
- broker::security security;
- mutable mutex mtx_subs_map_;
- sub_con_map subs_map_; /// subscription information
- shared_target shared_targets_; /// shared subscription targets
- ///< Map of active client id and connections
- /// session_state has references of subs_map_ and shared_targets_.
- /// because session_state (member of sessions_) has references of subs_map_ and shared_targets_.
- mutable mutex mtx_sessions_;
- session_states sessions_;
- mutable mutex mtx_retains_;
- retained_messages retains_; ///< A list of messages retained so they can be sent to newly subscribed clients.
- // MQTTv5 members
- v5::properties connack_props_;
- v5::properties suback_props_;
- v5::properties unsuback_props_;
- v5::properties puback_props_;
- v5::properties pubrec_props_;
- v5::properties pubrel_props_;
- v5::properties pubcomp_props_;
- std::function<void(v5::properties const&)> h_connect_props_;
- std::function<void(v5::properties const&)> h_disconnect_props_;
- std::function<void(v5::properties const&)> h_publish_props_;
- std::function<void(v5::properties const&)> h_puback_props_;
- std::function<void(v5::properties const&)> h_pubrec_props_;
- std::function<void(v5::properties const&)> h_pubrel_props_;
- std::function<void(v5::properties const&)> h_pubcomp_props_;
- std::function<void(v5::properties const&)> h_subscribe_props_;
- std::function<void(v5::properties const&)> h_unsubscribe_props_;
- std::function<void(v5::properties const&)> h_auth_props_;
- bool pingresp_ = true;
- bool connack_ = true;
- };
- MQTT_BROKER_NS_END
- #endif // MQTT_BROKER_BROKER_HPP
|