| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194 | // Copyright Takatoshi Kondo 2017//// Distributed under the Boost Software License, Version 1.0.// (See accompanying file LICENSE_1_0.txt or copy at// http://www.boost.org/LICENSE_1_0.txt)#if !defined(MQTT_BROKER_BROKER_HPP)#define MQTT_BROKER_BROKER_HPP#include <mqtt/config.hpp>#include <set>#include <boost/lexical_cast.hpp>#include <mqtt/broker/broker_namespace.hpp>#include <mqtt/optional.hpp>#include <mqtt/property.hpp>#include <mqtt/visitor_util.hpp>#include <mqtt/broker/session_state.hpp>#include <mqtt/broker/sub_con_map.hpp>#include <mqtt/broker/retained_messages.hpp>#include <mqtt/broker/retained_topic_map.hpp>#include <mqtt/broker/shared_target_impl.hpp>#include <mqtt/broker/mutex.hpp>#include <mqtt/broker/uuid.hpp>#include <mqtt/broker/constant.hpp>#include <mqtt/broker/security.hpp>MQTT_BROKER_NS_BEGINnamespace mi = boost::multi_index;namespace as = boost::asio;class broker_t {public:    broker_t(as::io_context& timer_ioc)        :timer_ioc_(timer_ioc),         tim_disconnect_(timer_ioc_) {        security.default_config();    }    // [begin] for test setting    /**     * @brief set_disconnect_delay adds a delay to disconnect operations.     *     * This makes the broker wait the specified amount between when a disconnect     * is received from a client, and when the connection is actually closed in     * the broker.     *     * @param delay - the amount to delay by     */    void set_disconnect_delay(std::chrono::steady_clock::duration delay) {        delay_disconnect_ = force_move(delay);    }    /**     * @brief set pingresp send operaton     *     * @param b - if true, send pingresp when pingreq is received.     *            if false, doesn't send pingresp for test.     */    void set_pingresp(bool b) {        pingresp_ = b;    }    /**     * @brief set pingresp send operaton     *     * @param b - if true, send connack when connect is received.     *            if false, doesn't send connack for test.     */    void set_connack(bool b) {        connack_ = b;    }    /**     * @brief configure the security settings     */    void set_security(broker::security&& security) {        this->security = force_move(security);    }    // [end] for test setting    /**     * @brief handle_accept     *     * Call this function when an server (of whatever kind) has accepted a raw     * connection from an MQTT client. By 'raw connection', this might be raw TCP sockets     * or websockets, or completed a TLS handshake, or any other underlying transport     * type, but what is not meant is that the mqtt client on the other end of the endpoint     * has initiated the MQTT application protocol connection sequence with CONNECT or CONACK     * messages being sent or received.     *     * This function will assign several event handlers into server (of whatever kind)     * that is provided as a parameter. This includes connection handlers, disconnection handlers     * and various handlers for a variety of of MQTT message types.     *     * @param ep - The server (of whichever kind) to accept a connection on.     */    void handle_accept(con_sp_t spep) {        con_wp_t wp(spep);        endpoint_t& ep = *spep;        ep.socket().lowest_layer().set_option(as::ip::tcp::no_delay(true));        ep.set_auto_pub_response(false);        ep.set_async_operation(true);        ep.set_topic_alias_maximum(MQTT_NS::topic_alias_max);        // set connection (lower than MQTT) level handlers        ep.set_close_handler(            [this, wp]            (){                con_sp_t sp = wp.lock();                BOOST_ASSERT(sp);                close_proc(force_move(sp), true);            });        ep.set_error_handler(            [this, wp]            (error_code ec){                con_sp_t sp = wp.lock();                BOOST_ASSERT(sp);                auto ver = sp->get_protocol_version();                MQTT_LOG("mqtt_broker", info)                    << MQTT_ADD_VALUE(address, this)                    << " error_handler is called. ec:" << ec.message() << " protocol_version:" << ver;                auto send_response =                    [&](auto ec) {                        if (sp->connected()) {                            auto rc =                                [&] () -> MQTT_NS::optional<v5::disconnect_reason_code> {                                    if (ec == boost::system::errc::protocol_error) {                                        return MQTT_NS::v5::disconnect_reason_code::protocol_error;                                    }                                    else if (ec == boost::system::errc::bad_message) {                                        return MQTT_NS::v5::disconnect_reason_code::malformed_packet;                                    }                                    return MQTT_NS::nullopt;                                }();                            if (rc) {                                MQTT_LOG("mqtt_broker", trace)                                    << MQTT_ADD_VALUE(address, this)                                    << "send DISCONNECT reason_code:" << rc.value();                                sp->async_disconnect(                                    rc.value(),                                    v5::properties{},                                    [sp]                                    (error_code ec) {                                        if (ec) {                                            MQTT_LOG("mqtt_broker", info)                                                << MQTT_ADD_VALUE(address, sp.get())                                                << ec.message();                                        }                                    }                                );                            }                        }                        else if (sp->underlying_connected()){                            // underlying layer connected, mqtt connecting                            // and protocol_version has already been determind as v5                            auto rc =                                [&] () -> MQTT_NS::optional<v5::connect_reason_code> {                                    if (ec ==boost::system::errc::protocol_error) {                                        return MQTT_NS::v5::connect_reason_code::protocol_error;                                    }                                    else if (ec == boost::system::errc::bad_message) {                                        return MQTT_NS::v5::connect_reason_code::malformed_packet;                                    }                                    return MQTT_NS::nullopt;                                }();                            if (rc) {                                MQTT_LOG("mqtt_broker", trace)                                    << MQTT_ADD_VALUE(address, this)                                    << "send CONNACK reason_code:" << rc.value();                                if (connack_) sp->async_connack(                                    false,                                    rc.value(),                                    [sp]                                    (error_code ec) {                                        if (ec) {                                            MQTT_LOG("mqtt_broker", info)                                                << MQTT_ADD_VALUE(address, sp.get())                                                << ec.message();                                        }                                    }                                );                            }                        }                    };                switch (ver) {                case MQTT_NS::protocol_version::v5:                    // https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#S4_13_Errors                    // https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901205                    //                    // The DISCONNECT packet is the final MQTT Control Packet sent from the Client or                    // the Server.                    send_response(ec);                    break;                case MQTT_NS::protocol_version::v3_1_1:                    // DISCONNECT can't be sent by broker on v3.1.1                    //                    // http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718090                    //                    // The DISCONNECT Packet is the final Control Packet sent from the Client to the Server.                    // It indicates that the Client is disconnecting cleanly.                    //                    // At the MQTT connecting, there is no appropriate Connect Return Code on v3.1.1                    // http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718035                    break;                default:                    // The protocol_version is in the CONNECT packet.                    // Protocol error could happen before the protocol_version is parsed.                    break;                }                close_proc(force_move(sp), true);            }        );        // set MQTT level handlers        ep.set_connect_handler(            [this, wp]            (buffer client_id,             optional<buffer> username,             optional<buffer> password,             optional<will> will,             bool clean_session,             std::uint16_t keep_alive) {                con_sp_t sp = wp.lock();                BOOST_ASSERT(sp);                auto p = sp.get();                try {                    return connect_handler(                        force_move(sp),                        force_move(client_id),                        force_move(username),                        force_move(password),                        force_move(will),                        clean_session,                        keep_alive,                        v5::properties{}                    );                }                catch (std::exception const& ex) {                    MQTT_LOG("mqtt_broker", error)                        << MQTT_ADD_VALUE(address, p)                        << ex.what();                    return true;                }            }        );        ep.set_v5_connect_handler(            [this, wp]            (buffer client_id,             optional<buffer> username,             optional<buffer> password,             optional<will> will,             bool clean_start,             std::uint16_t keep_alive,             v5::properties props) {                con_sp_t sp = wp.lock();                BOOST_ASSERT(sp);                auto p = sp.get();                try {                    return connect_handler(                        force_move(sp),                        force_move(client_id),                        force_move(username),                        force_move(password),                        force_move(will),                        clean_start,                        keep_alive,                        force_move(props)                    );                }                catch (std::exception const& ex) {                    MQTT_LOG("mqtt_broker", error)                        << MQTT_ADD_VALUE(address, p)                        << ex.what();                    return true;                }            }        );        ep.set_disconnect_handler(            [this, wp]            (){                con_sp_t sp = wp.lock();                BOOST_ASSERT(sp);                auto p = sp.get();                try {                    disconnect_handler(force_move(sp));                }                catch (std::exception const& ex) {                    MQTT_LOG("mqtt_broker", error)                        << MQTT_ADD_VALUE(address, p)                        << ex.what();                }            }        );        ep.set_v5_disconnect_handler(            [this, wp]            (v5::disconnect_reason_code /*reason_code*/, v5::properties props) {                if (h_disconnect_props_) h_disconnect_props_(force_move(props));                con_sp_t sp = wp.lock();                BOOST_ASSERT(sp);                auto p = sp.get();                try {                    disconnect_handler(force_move(sp));                }                catch (std::exception const& ex) {                    MQTT_LOG("mqtt_broker", error)                        << MQTT_ADD_VALUE(address, p)                        << ex.what();                }            }        );        ep.set_puback_handler(            [this, wp]            (packet_id_t packet_id){                con_sp_t sp = wp.lock();                BOOST_ASSERT(sp);                auto p = sp.get();                try {                    return puback_handler(                        force_move(sp),                        packet_id,                        v5::puback_reason_code::success,                        v5::properties{}                    );                }                catch (std::exception const& ex) {                    MQTT_LOG("mqtt_broker", error)                        << MQTT_ADD_VALUE(address, p)                        << ex.what();                    return true;                }            }        );        ep.set_v5_puback_handler(            [this, wp]            (packet_id_t packet_id,             v5::puback_reason_code reason_code,             v5::properties props){                con_sp_t sp = wp.lock();                BOOST_ASSERT(sp);                auto p = sp.get();                try {                    return puback_handler(                        force_move(sp),                        packet_id,                        reason_code,                        force_move(props)                    );                }                catch (std::exception const& ex) {                    MQTT_LOG("mqtt_broker", error)                        << MQTT_ADD_VALUE(address, p)                        << ex.what();                    return true;                }            }        );        ep.set_pubrec_handler(            [this, wp]            (packet_id_t packet_id){                con_sp_t sp = wp.lock();                BOOST_ASSERT(sp);                auto p = sp.get();                try {                    return pubrec_handler(                        force_move(sp),                        packet_id,                        v5::pubrec_reason_code::success,                        v5::properties{}                    );                }                catch (std::exception const& ex) {                    MQTT_LOG("mqtt_broker", error)                        << MQTT_ADD_VALUE(address, p)                        << ex.what();                    return true;                }            }        );        ep.set_v5_pubrec_handler(            [this, wp]            (packet_id_t packet_id,             v5::pubrec_reason_code reason_code,             v5::properties props){                con_sp_t sp = wp.lock();                BOOST_ASSERT(sp);                auto p = sp.get();                try {                    return pubrec_handler(                        force_move(sp),                        packet_id,                        reason_code,                        force_move(props)                    );                }                catch (std::exception const& ex) {                    MQTT_LOG("mqtt_broker", error)                        << MQTT_ADD_VALUE(address, p)                        << ex.what();                    return true;                }            }        );        ep.set_pubrel_handler(            [this, wp]            (packet_id_t packet_id){                con_sp_t sp = wp.lock();                BOOST_ASSERT(sp);                auto p = sp.get();                try {                    return pubrel_handler(                        force_move(sp),                        packet_id,                        v5::pubrel_reason_code::success,                        v5::properties{}                    );                }                catch (std::exception const& ex) {                    MQTT_LOG("mqtt_broker", error)                        << MQTT_ADD_VALUE(address, p)                        << ex.what();                    return true;                }            }        );        ep.set_v5_pubrel_handler(            [this, wp]            (packet_id_t packet_id,             v5::pubrel_reason_code reason_code,             v5::properties props){                con_sp_t sp = wp.lock();                BOOST_ASSERT(sp);                auto p = sp.get();                try {                    return pubrel_handler(                        force_move(sp),                        packet_id,                        reason_code,                        force_move(props)                    );                }                catch (std::exception const& ex) {                    MQTT_LOG("mqtt_broker", error)                        << MQTT_ADD_VALUE(address, p)                        << ex.what();                    return true;                }            }        );        ep.set_pubcomp_handler(            [this, wp]            (packet_id_t packet_id){                con_sp_t sp = wp.lock();                BOOST_ASSERT(sp);                auto p = sp.get();                try {                    return pubcomp_handler(                        force_move(sp),                        packet_id,                        v5::pubcomp_reason_code::success,                        v5::properties{}                    );                }                catch (std::exception const& ex) {                    MQTT_LOG("mqtt_broker", error)                        << MQTT_ADD_VALUE(address, p)                        << ex.what();                    return true;                }            }        );        ep.set_v5_pubcomp_handler(            [this, wp]            (packet_id_t packet_id,             v5::pubcomp_reason_code reason_code,             v5::properties props){                con_sp_t sp = wp.lock();                BOOST_ASSERT(sp);                auto p = sp.get();                try {                    return pubcomp_handler(                        force_move(sp),                        packet_id,                        reason_code,                        force_move(props)                    );                }                catch (std::exception const& ex) {                    MQTT_LOG("mqtt_broker", error)                        << MQTT_ADD_VALUE(address, p)                        << ex.what();                    return true;                }            }        );        ep.set_publish_handler(            [this, wp]            (optional<packet_id_t> packet_id,             publish_options pubopts,             buffer topic_name,             buffer contents){                con_sp_t sp = wp.lock();                BOOST_ASSERT(sp);                auto p = sp.get();                try {                    return publish_handler(                        force_move(sp),                        packet_id,                        pubopts,                        force_move(topic_name),                        force_move(contents),                        v5::properties{}                    );                }                catch (std::exception const& ex) {                    MQTT_LOG("mqtt_broker", error)                        << MQTT_ADD_VALUE(address, p)                        << ex.what();                    return true;                }            }        );        ep.set_v5_publish_handler(            [this, wp]            (optional<packet_id_t> packet_id,             publish_options pubopts,             buffer topic_name,             buffer contents,             v5::properties props            ) {                if (h_publish_props_) h_publish_props_(props);                con_sp_t sp = wp.lock();                BOOST_ASSERT(sp);                auto p = sp.get();                try {                    return publish_handler(                        force_move(sp),                        packet_id,                        pubopts,                        force_move(topic_name),                        force_move(contents),                        force_move(props)                    );                }                catch (std::exception const& ex) {                    MQTT_LOG("mqtt_broker", error)                        << MQTT_ADD_VALUE(address, p)                        << ex.what();                    return true;                }            }        );        ep.set_subscribe_handler(            [this, wp]            (packet_id_t packet_id,             std::vector<subscribe_entry> entries) {                con_sp_t sp = wp.lock();                BOOST_ASSERT(sp);                auto p = sp.get();                try {                    return subscribe_handler(                        force_move(sp),                        packet_id,                        force_move(entries),                        v5::properties{}                    );                }                catch (std::exception const& ex) {                    MQTT_LOG("mqtt_broker", error)                        << MQTT_ADD_VALUE(address, p)                        << ex.what();                    return true;                }            }        );        ep.set_v5_subscribe_handler(            [this, wp]            (packet_id_t packet_id,             std::vector<subscribe_entry> entries,             v5::properties props            ) {                con_sp_t sp = wp.lock();                BOOST_ASSERT(sp);                auto p = sp.get();                try {                    return subscribe_handler(                        force_move(sp),                        packet_id,                        force_move(entries),                        force_move(props)                    );                }                catch (std::exception const& ex) {                    MQTT_LOG("mqtt_broker", error)                        << MQTT_ADD_VALUE(address, p)                        << ex.what();                    return true;                }            }        );        ep.set_unsubscribe_handler(            [this, wp]            (packet_id_t packet_id,             std::vector<unsubscribe_entry> entries) {                con_sp_t sp = wp.lock();                BOOST_ASSERT(sp);                auto p = sp.get();                try {                    return unsubscribe_handler(                        force_move(sp),                        packet_id,                        force_move(entries),                        v5::properties{}                    );                }                catch (std::exception const& ex) {                    MQTT_LOG("mqtt_broker", error)                        << MQTT_ADD_VALUE(address, p)                        << ex.what();                    return true;                }            }        );        ep.set_v5_unsubscribe_handler(            [this, wp]            (packet_id_t packet_id,             std::vector<unsubscribe_entry> entries,             v5::properties props            ) {                con_sp_t sp = wp.lock();                BOOST_ASSERT(sp);                auto p = sp.get();                try {                    return unsubscribe_handler(                        force_move(sp),                        packet_id,                        force_move(entries),                        force_move(props)                    );                }                catch (std::exception const& ex) {                    MQTT_LOG("mqtt_broker", error)                        << MQTT_ADD_VALUE(address, p)                        << ex.what();                    return true;                }            }        );        ep.set_pingreq_handler(            [this, wp] {                con_sp_t sp = wp.lock();                BOOST_ASSERT(sp);                if (pingresp_) {                    auto p = sp.get();                    p->async_pingresp(                        [sp = force_move(sp)]                        (error_code ec) {                            if (ec) {                                MQTT_LOG("mqtt_broker", info)                                    << MQTT_ADD_VALUE(address, sp.get())                                    << ec.message();                            }                        }                    );                }                return true;            }        );        ep.set_v5_auth_handler(            [this]            (v5::auth_reason_code /*reason_code*/,             v5::properties props            ) {                if (h_auth_props_) h_auth_props_(force_move(props));                return true;            }        );        // Pass spep to keep lifetime.        // It makes sure wp.lock() never return nullptr in the handlers below        // including close_handler and error_handler.        ep.start_session(spep);    }    void set_connack_props(v5::properties props) {        connack_props_ = force_move(props);    }    void set_suback_props(v5::properties props) {        suback_props_ = force_move(props);    }    void set_unsuback_props(v5::properties props) {        unsuback_props_ = force_move(props);    }    void set_puback_props(v5::properties props) {        puback_props_ = force_move(props);    }    void set_pubrec_props(v5::properties props) {        pubrec_props_ = force_move(props);    }    void set_pubrel_props(v5::properties props) {        pubrel_props_ = force_move(props);    }    void set_pubcomp_props(v5::properties props) {        pubcomp_props_ = force_move(props);    }    void set_connect_props_handler(std::function<void(v5::properties const&)> h) {        h_connect_props_ = force_move(h);    }    void set_disconnect_props_handler(std::function<void(v5::properties const&)> h) {        h_disconnect_props_ = force_move(h);    }    void set_publish_props_handler(std::function<void(v5::properties const&)> h) {        h_publish_props_ = force_move(h);    }    void set_puback_props_handler(std::function<void(v5::properties const&)> h) {        h_puback_props_ = force_move(h);    }    void set_pubrec_props_handler(std::function<void(v5::properties const&)> h) {        h_pubrec_props_ = force_move(h);    }    void set_pubrel_props_handler(std::function<void(v5::properties const&)> h) {        h_pubrel_props_ = force_move(h);    }    void set_pubcomp_props_handler(std::function<void(v5::properties const&)> h) {        h_pubcomp_props_ = force_move(h);    }    void set_subscribe_props_handler(std::function<void(v5::properties const&)> h) {        h_subscribe_props_ = force_move(h);    }    void set_unsubscribe_props_handler(std::function<void(v5::properties const&)> h) {        h_unsubscribe_props_ = force_move(h);    }    void set_auth_props_handler(std::function<void(v5::properties const&)> h) {        h_auth_props_ = force_move(h);    }    void clear_all_sessions() {        std::lock_guard<mutex> g(mtx_sessions_);        sessions_.clear();    }    void clear_all_retained_topics() {        std::lock_guard<mutex> g(mtx_retains_);        retains_.clear();    }private:    static void force_disconnect(con_sp_t spep) {        auto p = spep.get();        p->async_force_disconnect(            [spep = force_move(spep)]            (error_code ec) {                if (ec) {                    MQTT_LOG("mqtt_broker", info)                        << MQTT_ADD_VALUE(address, spep.get())                        << ec.message();                }            }        );    };    static void disconnect_and_force_disconnect(con_sp_t spep, v5::disconnect_reason_code rc) {        auto p = spep.get();        p->async_disconnect(            rc,            v5::properties{},            [spep = force_move(spep)]            (error_code) mutable {                force_disconnect(force_move(spep));            }        );    };    /**     * @brief connect_proc Process an incoming CONNECT packet     *     * This is called by the connect_handler function, which is registered     * on mqtt connections where the raw transport (tcp / tls / websocket / etc)     * is established, but the CONNECT message has not been sent / received by     * the mqtt client on the other end of the connection.     *     * When the CONNECT message is received, this function is called after some     * basic pre-connection logic, to setup the record keeping that this broker     * class needs to handle the connection and process subscriptions and publishing.     *     * @param clean_start - if the clean-start flag is set on the CONNECT message.     * @param spep - varient of shared pointers to underlying connection type.     * @param req_client_id - the id that the client wants to use (username will be prepended)     * @param will - the last-will-and-testiment of the connection, if any.     */    bool connect_handler(        con_sp_t spep,        buffer client_id,        optional<buffer> noauth_username,        optional<buffer> password,        optional<will> will,        bool clean_start,        std::uint16_t /*keep_alive*/,        v5::properties props    ) {        auto& ep = *spep;        optional<std::string> username;        if (ep.get_preauthed_user_name()) {            if (security.login_cert(ep.get_preauthed_user_name().value())) {                username = ep.get_preauthed_user_name();            }        }        else if (!noauth_username && !password) {            username = security.login_anonymous();        }        else if (noauth_username && password) {            username = security.login(*noauth_username, *password);        }        // If login fails, try the unauthenticated user        if (!username) username = security.login_unauthenticated();        v5::properties connack_props;        connect_param cp = handle_connect_props(ep, props, will);        if (!username) {            MQTT_LOG("mqtt_broker", trace)                << MQTT_ADD_VALUE(address, this)                << "User failed to login: "                << (noauth_username ? std::string(*noauth_username) : std::string("anonymous user"));            send_connack(                ep,                false, // session present                false, // authenticated                force_move(connack_props),                [spep](error_code) {                    disconnect_and_force_disconnect(spep, v5::disconnect_reason_code::not_authorized);                }            );            return true;        }        if (client_id.empty()) {            if (!handle_empty_client_id(spep, client_id, clean_start, connack_props)) {                return false;            }            // A new client id was generated            client_id = buffer(string_view(spep->get_client_id()));        }        MQTT_LOG("mqtt_broker", trace)            << MQTT_ADD_VALUE(address, this)            << "User logged in as: '" << *username << "', client_id: " << client_id;        /**         * http://docs.oasis-open.org/mqtt/mqtt/v5.0/cs02/mqtt-v5.0-cs02.html#_Toc514345311         * 3.1.2.4 Clean Start         * If a CONNECT packet is received with Clean Start is set to 1, the Client and Server MUST         * discard any existing Session and start a new Session [MQTT-3.1.2-4]. Consequently,         *  the Session Present flag in CONNACK is always set to 0 if Clean Start is set to 1.         */        // Find any sessions that have the same client_id        std::lock_guard<mutex> g(mtx_sessions_);        auto& idx = sessions_.get<tag_cid>();        auto it = idx.lower_bound(std::make_tuple(*username, client_id));        if (it == idx.end() ||            it->client_id() != client_id ||            it->get_username() != *username        ) {            // new connection            MQTT_LOG("mqtt_broker", trace)                << MQTT_ADD_VALUE(address, this)                << "cid:" << client_id                << " new connection inserted.";            it = idx.emplace_hint(                it,                timer_ioc_,                mtx_subs_map_,                subs_map_,                shared_targets_,                spep,                client_id,                *username,                force_move(will),                // will_sender                [this](auto&&... params) {                    do_publish(std::forward<decltype(params)>(params)...);                },                force_move(cp.will_expiry_interval),                force_move(cp.session_expiry_interval)            );            if (cp.response_topic_requested) {                // set_response_topic never modify key part                set_response_topic(const_cast<session_state&>(*it), connack_props, *username);            }            send_connack(                ep,                false, // session present                true,  // authenticated                force_move(connack_props)            );        }        else if (it->online()) {            // online overwrite            if (close_proc_no_lock(it->con(), true, v5::disconnect_reason_code::session_taken_over)) {                // remain offline                if (clean_start) {                    // discard offline session                    MQTT_LOG("mqtt_broker", trace)                        << MQTT_ADD_VALUE(address, this)                        << "cid:" << client_id                        << "online connection exists, discard old one due to new one's clean_start and renew";                    if (cp.response_topic_requested) {                        // set_response_topic never modify key part                        set_response_topic(const_cast<session_state&>(*it), connack_props, *username);                    }                    send_connack(                        ep,                        false, // session present                        true,  // authenticated                        force_move(connack_props)                    );                    idx.modify(                        it,                        [&](auto& e) {                            e.clean();                            e.update_will(timer_ioc_, force_move(will), cp.will_expiry_interval);                            e.set_username(*username);                            // renew_session_expiry updates index                            e.renew_session_expiry(force_move(cp.session_expiry_interval));                        },                        [](auto&) { BOOST_ASSERT(false); }                    );                }                else {                    // inherit online session if previous session's session exists                    MQTT_LOG("mqtt_broker", trace)                        << MQTT_ADD_VALUE(address, this)                        << "cid:" << client_id                        << "online connection exists, inherit old one and renew";                    if (cp.response_topic_requested) {                        // set_response_topic never modify key part                        set_response_topic(const_cast<session_state&>(*it), connack_props, *username);                    }                    send_connack(                        ep,                        true, // session present                        true, // authenticated                        force_move(connack_props),                        [                            this,                            &idx,                            it,                            will = force_move(will),                            clean_start,                            spep,                            will_expiry_interval = cp.will_expiry_interval,                            session_expiry_interval = cp.session_expiry_interval,                            username                        ]                        (error_code ec) mutable {                            if (ec) {                                MQTT_LOG("mqtt_broker", trace)                                    << MQTT_ADD_VALUE(address, this)                                    << ec.message();                                return;                            }                            idx.modify(                                it,                                [&](auto& e) {                                    e.renew(spep, clean_start);                                    e.set_username(*username);                                    e.update_will(timer_ioc_, force_move(will), will_expiry_interval);                                    // renew_session_expiry updates index                                    e.renew_session_expiry(force_move(session_expiry_interval));                                    e.send_inflight_messages();                                    e.send_all_offline_messages();                                },                                [](auto&) { BOOST_ASSERT(false); }                            );                        }                    );                }            }            else {                // new connection                MQTT_LOG("mqtt_broker", trace)                    << MQTT_ADD_VALUE(address, this)                    << "cid:" << client_id                    << "online connection exists, discard old one due to session_expiry and renew";                bool inserted;                std::tie(it, inserted) = idx.emplace(                    timer_ioc_,                    mtx_subs_map_,                    subs_map_,                    shared_targets_,                    spep,                    client_id,                    *username,                    force_move(will),                    // will_sender                    [this](auto&&... params) {                        do_publish(std::forward<decltype(params)>(params)...);                    },                    force_move(cp.will_expiry_interval),                    force_move(cp.session_expiry_interval)                );                BOOST_ASSERT(inserted);                if (cp.response_topic_requested) {                    // set_response_topic never modify key part                    set_response_topic(const_cast<session_state&>(*it), connack_props, *username);                }                send_connack(                    ep,                    false, // session present                    true,  // authenticated                    force_move(connack_props)                );            }        }        else {            // offline -> online            if (clean_start) {                // discard offline session                MQTT_LOG("mqtt_broker", trace)                    << MQTT_ADD_VALUE(address, this)                    << "cid:" << client_id                    << "offline connection exists, discard old one due to new one's clean_start and renew";                if (cp.response_topic_requested) {                    // set_response_topic never modify key part                    set_response_topic(const_cast<session_state&>(*it), connack_props, *username);                }                send_connack(                    ep,                    false, // session present                    true,  // authenticated                    force_move(connack_props)                );                idx.modify(                    it,                    [&](auto& e) {                        e.clean();                        e.renew(spep, clean_start);                        e.update_will(timer_ioc_, force_move(will), cp.will_expiry_interval);                        e.set_username(*username);                        // renew_session_expiry updates index                        e.renew_session_expiry(force_move(cp.session_expiry_interval));                    },                    [](auto&) { BOOST_ASSERT(false); }                );            }            else {                // inherit offline session                MQTT_LOG("mqtt_broker", trace)                    << MQTT_ADD_VALUE(address, this)                    << "cid:" << client_id                    << "offline connection exists, inherit old one and renew";                if (cp.response_topic_requested) {                    // set_response_topic never modify key part                    set_response_topic(const_cast<session_state&>(*it), connack_props, *username);                }                send_connack(                    ep,                    true, // session present                    true,  // authenticated                    force_move(connack_props),                    [                        this,                        &idx,                        it,                        will = force_move(will),                        clean_start,                        spep,                        will_expiry_interval = cp.will_expiry_interval,                        session_expiry_interval = cp.session_expiry_interval,                        username                    ]                    (error_code ec) mutable {                        if (ec) {                            MQTT_LOG("mqtt_broker", trace)                                << MQTT_ADD_VALUE(address, this)                                << ec.message();                            return;                        }                        idx.modify(                            it,                            [&](auto& e) {                                e.renew(spep, clean_start);                                e.set_username(*username);                                e.update_will(timer_ioc_, force_move(will), will_expiry_interval);                                // renew_session_expiry updates index                                e.renew_session_expiry(force_move(session_expiry_interval));                                e.send_inflight_messages();                                e.send_all_offline_messages();                            },                            [](auto&) { BOOST_ASSERT(false); }                        );                    }                );            }        }        return true;    }    struct connect_param {        optional<std::chrono::steady_clock::duration> session_expiry_interval;        optional<std::chrono::steady_clock::duration> will_expiry_interval;        bool response_topic_requested = false;    };    connect_param handle_connect_props(        endpoint_t& ep,        v5::properties const& props,        optional<will> const& will    ) {        connect_param cp;        if (ep.get_protocol_version() == protocol_version::v5) {            {                auto v = get_property<v5::property::session_expiry_interval>(props);                if (v && v.value().val() != 0) {                    cp.session_expiry_interval.emplace(std::chrono::seconds(v.value().val()));                }            }            {                auto v = get_property<v5::property::request_response_information>(props);                if (v && v.value().val() == 1) {                    cp.response_topic_requested = true;                }            }            if (will) {                auto v = get_property<v5::property::message_expiry_interval>(will.value().props());                if (v) {                    cp.will_expiry_interval.emplace(std::chrono::seconds(v.value().val()));                }            }            if (h_connect_props_) {                h_connect_props_(props);            }        }        return cp;    }    void send_connack(        endpoint_t& ep,        bool session_present,        bool authenticated,        v5::properties props,        std::function<void(error_code)> finish = [](error_code){}    ) {        // Reply to the connect message.        switch (ep.get_protocol_version()) {        case protocol_version::v3_1_1:            if (connack_) ep.async_connack(                session_present,                authenticated ? connect_return_code::accepted                              : connect_return_code::not_authorized,                [finish = force_move(finish)]                (error_code ec) {                    finish(ec);                }            );            break;        case protocol_version::v5:            // connack_props_ member varible is for testing            if (connack_props_.empty()) {                // props local variable is is for real case                props.emplace_back(v5::property::topic_alias_maximum{topic_alias_max});                props.emplace_back(v5::property::receive_maximum{receive_maximum_max});                if (connack_) ep.async_connack(                    session_present,                    authenticated ? v5::connect_reason_code::success                                  : v5::connect_reason_code::not_authorized,                    force_move(props),                    [finish = force_move(finish)]                    (error_code ec) {                        finish(ec);                    }                );            }            else {                // use connack_props_ for testing                if (connack_) ep.async_connack(                    session_present,                    authenticated ? v5::connect_reason_code::success                                  : v5::connect_reason_code::not_authorized,                    connack_props_,                    [finish = force_move(finish)]                    (error_code ec) {                        finish(ec);                    }                );            }            break;        default:            BOOST_ASSERT(false);            break;        }    }    void remove_rule(std::size_t rule_nr) {        security.remove_auth(rule_nr);    }    void set_response_topic(session_state& s, v5::properties& connack_props, std::string const &username) {        auto response_topic =            [&] {                if (auto rt_opt = s.get_response_topic()) {                    return rt_opt.value();                }                auto rt = create_uuid_string();                s.set_response_topic(rt);                return rt;            } ();        auto rule_nr = security.add_auth(            response_topic,            { "@any" }, MQTT_NS::broker::security::authorization::type::allow,            { username }, MQTT_NS::broker::security::authorization::type::allow        );        s.set_clean_handler(            [this, response_topic, rule_nr]() {                std::lock_guard<mutex> g(mtx_retains_);                retains_.erase(response_topic);                remove_rule(rule_nr);            }        );        connack_props.emplace_back(            v5::property::response_topic(                allocate_buffer(response_topic)            )        );    }    bool handle_empty_client_id(        con_sp_t spep,        buffer const& client_id,        bool clean_start,        v5::properties& connack_props    ) {        auto& ep = *spep;        switch (ep.get_protocol_version()) {        case protocol_version::v3_1_1:            if (client_id.empty()) {                if (clean_start) {                    ep.set_client_id(create_uuid_string());                }                else {                    // https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349242                    // If the Client supplies a zero-byte ClientId,                    // the Client MUST also set CleanSession to 1 [MQTT-3.1.3-7].                    // If it's a not a clean session, but no client id is provided,                    // we would have no way to map this connection's session to a new connection later.                    // So the connection must be rejected.                    if (connack_) {                        ep.async_connack(                            false,                            connect_return_code::identifier_rejected,                            [&ep, spep = force_move(spep)]                            (error_code ec) mutable {                                if (ec) {                                    MQTT_LOG("mqtt_broker", info)                                        << MQTT_ADD_VALUE(address, spep.get())                                        << ec.message();                                }                                ep.async_force_disconnect(                                    [spep = force_move(spep)]                                    (error_code ec) {                                        if (ec) {                                            MQTT_LOG("mqtt_broker", info)                                                << MQTT_ADD_VALUE(address, spep.get())                                                << ec.message();                                        }                                    }                                );                            }                        );                    }                    return false;                }            }            break;        case protocol_version::v5:            if (client_id.empty()) {                // https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901059                //  A Server MAY allow a Client to supply a ClientID that has a length of zero bytes,                // however if it does so the Server MUST treat this as a special case and assign a                // unique ClientID to that Client [MQTT-3.1.3-6]. It MUST then process the                // CONNECT packet as if the Client had provided that unique ClientID,                // and MUST return the Assigned Client Identifier in the CONNACK packet [MQTT-3.1.3-7].                // If the Server rejects the ClientID it MAY respond to the CONNECT packet with a CONNACK                // using Reason Code 0x85 (Client Identifier not valid) as described in section 4.13                // Handling errors, and then it MUST close the Network Connection [MQTT-3.1.3-8].                //                // mqtt_cpp author's note: On v5.0, no Clean Start restriction is described.                ep.set_client_id(create_uuid_string());                connack_props.emplace_back(                    v5::property::assigned_client_identifier(buffer(string_view(ep.get_client_id())))                );            }            break;        default:            BOOST_ASSERT(false);            return false;        }        return true;    }    void disconnect_handler(        con_sp_t spep    ) {        if (delay_disconnect_) {            tim_disconnect_.expires_after(delay_disconnect_.value());            tim_disconnect_.wait();        }        close_proc(force_move(spep), false);    }    /**     * @brief close_proc_no_lock - clean up a connection that has been closed.     *     * @param ep - The underlying server (of whichever type) that is disconnecting.     * @param send_will - Whether to publish this connections last will     * @return true if offline session is remained, otherwise false     */    // TODO: Maybe change the name of this function.    bool close_proc_no_lock(        con_sp_t spep,        bool send_will,        optional<v5::disconnect_reason_code> rc) {        endpoint_t& ep = *spep;        auto& idx = sessions_.get<tag_con>();        auto it = idx.find(spep);        // act_sess_it == act_sess_idx.end() could happen if broker accepts        // the session from client but the client closes the session  before sending        // MQTT `CONNECT` message.        // In this case, do nothing is correct behavior.        if (it == idx.end()) return false;        bool session_clear =            [&] {                if (ep.get_protocol_version() == protocol_version::v3_1_1) {                    return ep.clean_session();                }                else {                    BOOST_ASSERT(ep.get_protocol_version() == protocol_version::v5);                    auto const& sei_opt = it->session_expiry_interval();                    return !sei_opt || sei_opt.value() == std::chrono::steady_clock::duration::zero();                }            } ();        auto do_send_will =            [&](session_state& ss) {                if (send_will) {                    ss.send_will();                }                else {                    ss.clear_will();                }            };        if (session_clear) {            // const_cast is appropriate here            // See https://github.com/boostorg/multi_index/issues/50            auto& ss = const_cast<session_state&>(*it);            do_send_will(ss);            if (rc) {                MQTT_LOG("mqtt_broker", trace)                    << MQTT_ADD_VALUE(address, spep.get())                    << "disconnect_and_force_disconnect(async) cid:" << ss.client_id();                disconnect_and_force_disconnect(spep, rc.value());            }            else {                MQTT_LOG("mqtt_broker", trace)                    << MQTT_ADD_VALUE(address, spep.get())                    << "force_disconnect(async) cid:" << ss.client_id();                force_disconnect(spep);            }            idx.erase(it);            BOOST_ASSERT(sessions_.get<tag_con>().find(spep) == sessions_.get<tag_con>().end());            return false;        }        else {            idx.modify(                it,                [&](session_state& ss) {                    do_send_will(ss);                    if (rc) {                        MQTT_LOG("mqtt_broker", trace)                            << MQTT_ADD_VALUE(address, spep.get())                            << "disconnect_and_force_disconnect(async) cid:" << ss.client_id();                        disconnect_and_force_disconnect(spep, rc.value());                    }                    else {                        MQTT_LOG("mqtt_broker", trace)                            << MQTT_ADD_VALUE(address, spep.get())                            << "force_disconnect(async) cid:" << ss.client_id();                        force_disconnect(spep);                    }                    // become_offline updates index                    ss.become_offline(                        [this]                        (std::shared_ptr<as::steady_timer> const& sp_tim) {                            sessions_.get<tag_tim>().erase(sp_tim);                        }                    );                },                [](auto&) { BOOST_ASSERT(false); }            );            return true;        }    }    /**     * @brief close_proc - clean up a connection that has been closed.     *     * @param ep - The underlying server (of whichever type) that is disconnecting.     * @param send_will - Whether to publish this connections last will     * @param rc - Reason Code for send pack DISCONNECT     * @return true if offline session is remained, otherwise false     */    // TODO: Maybe change the name of this function.    bool close_proc(        con_sp_t spep,        bool send_will,        optional<v5::disconnect_reason_code> rc = nullopt    ) {        std::lock_guard<mutex> g(mtx_sessions_);        return close_proc_no_lock(force_move(spep), send_will, rc);    }    bool publish_handler(        con_sp_t spep,        optional<packet_id_t> packet_id,        publish_options pubopts,        buffer topic_name,        buffer contents,        v5::properties props) {        auto& ep = *spep;        std::shared_lock<mutex> g(mtx_sessions_);        auto& idx = sessions_.get<tag_con>();        auto it = idx.find(spep);        // broker uses async_* APIs        // If broker erase a connection, then async_force_disconnect()        // and/or async_force_disconnect () is called.        // During async operation, spep is valid but it has already been        // erased from sessions_        if (it == idx.end()) return true;        auto send_pubres =            [&] (bool authorized = true) {                switch (pubopts.get_qos()) {                case qos::at_least_once:                    ep.async_puback(                        packet_id.value(),                        authorized ? v5::puback_reason_code::success                                   : v5::puback_reason_code::not_authorized,                        puback_props_,                        [spep = force_move(spep)]                        (error_code ec) {                            if (ec) {                                MQTT_LOG("mqtt_broker", info)                                    << MQTT_ADD_VALUE(address, spep.get())                                    << ec.message();                            }                        }                    );                    break;                case qos::exactly_once: {                    ep.async_pubrec(                        packet_id.value(),                        authorized ? v5::pubrec_reason_code::success                                   : v5::pubrec_reason_code::not_authorized,                        pubrec_props_,                        [spep = force_move(spep)]                        (error_code ec) {                            if (ec) {                                MQTT_LOG("mqtt_broker", info)                                    << MQTT_ADD_VALUE(address, spep.get())                                    << ec.message();                            }                        }                    );                } break;                default:                    break;                }            };        // See if this session is authorized to publish this topic        if (security.auth_pub(topic_name, it->get_username()) != security::authorization::type::allow) {            // Publish not authorized            send_pubres(false);            return true;        }        v5::properties forward_props;        for (auto&& p : props) {            MQTT_NS::visit(                make_lambda_visitor(                    [](v5::property::topic_alias&&) {                        // TopicAlias is not forwarded                        // https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901113                        // A receiver MUST NOT carry forward any Topic Alias mappings from                        // one Network Connection to another [MQTT-3.3.2-7].                    },                    [&ep](v5::property::subscription_identifier&& p) {                        MQTT_LOG("mqtt_broker", warning)                            << MQTT_ADD_VALUE(address, &ep)                            << "Subscription Identifier from client not forwarded sid:" << p.val();                    },                    [&forward_props](auto&& p) {                        forward_props.push_back(force_move(p));                    }                ),                force_move(p)            );        }        do_publish(            *it,            force_move(topic_name),            force_move(contents),            pubopts.get_qos() | pubopts.get_retain(), // remove dup flag            force_move(forward_props)        );        send_pubres();        return true;    }    bool puback_handler(        con_sp_t spep,        packet_id_t packet_id,        v5::puback_reason_code /*reason_code*/,        v5::properties /*props*/) {        std::shared_lock<mutex> g(mtx_sessions_);        auto& idx = sessions_.get<tag_con>();        auto it = idx.find(spep);        // broker uses async_* APIs        // If broker erase a connection, then async_force_disconnect()        // and/or async_force_disconnect () is called.        // During async operation, spep is valid but it has already been        // erased from sessions_        if (it == idx.end()) return true;        // const_cast is appropriate here        // See https://github.com/boostorg/multi_index/issues/50        auto& ss = const_cast<session_state&>(*it);        ss.erase_inflight_message_by_packet_id(packet_id);        ss.send_offline_messages_by_packet_id_release();        return true;    }    bool pubrec_handler(        con_sp_t spep,        packet_id_t packet_id,        v5::pubrec_reason_code reason_code,        v5::properties /*props*/) {        std::shared_lock<mutex> g(mtx_sessions_);        auto& idx = sessions_.get<tag_con>();        auto it = idx.find(spep);        // broker uses async_* APIs        // If broker erase a connection, then async_force_disconnect()        // and/or async_force_disconnect () is called.        // During async operation, spep is valid but it has already been        // erased from sessions_        if (it == idx.end()) return true;        // const_cast is appropriate here        // See https://github.com/boostorg/multi_index/issues/50        auto& ss = const_cast<session_state&>(*it);        ss.erase_inflight_message_by_packet_id(packet_id);        if (is_error(reason_code)) return true;        auto& ep = *spep;        switch (ep.get_protocol_version()) {        case protocol_version::v3_1_1:            ep.async_pubrel(                packet_id,                [spep = force_move(spep)]                (error_code ec) {                    if (ec) {                        MQTT_LOG("mqtt_broker", info)                            << MQTT_ADD_VALUE(address, spep.get())                            << ec.message();                    }                }            );            break;        case protocol_version::v5:            ep.async_pubrel(                packet_id,                v5::pubrel_reason_code::success,                pubrel_props_,                [spep = force_move(spep)]                (error_code ec) {                    if (ec) {                        MQTT_LOG("mqtt_broker", info)                            << MQTT_ADD_VALUE(address, spep.get())                            << ec.message();                    }                }            );            break;        default:            BOOST_ASSERT(false);            break;        }        return true;    }    bool pubrel_handler(        con_sp_t spep,        packet_id_t packet_id,        v5::pubrel_reason_code reason_code,        v5::properties /*props*/) {        std::shared_lock<mutex> g(mtx_sessions_);        auto& idx = sessions_.get<tag_con>();        auto it = idx.find(spep);        // broker uses async_* APIs        // If broker erase a connection, then async_force_disconnect()        // and/or async_force_disconnect () is called.        // During async operation, spep is valid but it has already been        // erased from sessions_        if (it == idx.end()) return true;        auto& ep = *spep;        switch (ep.get_protocol_version()) {        case protocol_version::v3_1_1:            ep.async_pubcomp(                packet_id,                [spep = force_move(spep)]                (error_code ec) {                    if (ec) {                        MQTT_LOG("mqtt_broker", info)                            << MQTT_ADD_VALUE(address, spep.get())                            << ec.message();                    }                }            );            break;        case protocol_version::v5:            ep.async_pubcomp(                packet_id,                // pubcomp reason code is the same as pubrel one                static_cast<v5::pubcomp_reason_code>(reason_code),                pubcomp_props_,                [spep = force_move(spep)]                (error_code ec) {                    if (ec) {                        MQTT_LOG("mqtt_broker", info)                            << MQTT_ADD_VALUE(address, spep.get())                            << ec.message();                    }                }            );            break;        default:            BOOST_ASSERT(false);            break;        }        return true;    }    bool pubcomp_handler(        con_sp_t spep,        packet_id_t packet_id,        v5::pubcomp_reason_code /*reason_code*/,        v5::properties /*props*/){        std::shared_lock<mutex> g(mtx_sessions_);        auto& idx = sessions_.get<tag_con>();        auto it = idx.find(spep);        // broker uses async_* APIs        // If broker erase a connection, then async_force_disconnect()        // and/or async_force_disconnect () is called.        // During async operation, spep is valid but it has already been        // erased from sessions_        if (it == idx.end()) return true;        // const_cast is appropriate here        // See https://github.com/boostorg/multi_index/issues/50        auto& ss = const_cast<session_state&>(*it);        ss.erase_inflight_message_by_packet_id(packet_id);        ss.send_offline_messages_by_packet_id_release();        return true;    }    bool subscribe_handler(        con_sp_t spep,        packet_id_t packet_id,        std::vector<subscribe_entry> entries,        v5::properties props) {        auto& ep = *spep;        std::shared_lock<mutex> g(mtx_sessions_);        auto& idx = sessions_.get<tag_con>();        auto it = idx.find(spep);        // broker uses async_* APIs        // If broker erase a connection, then async_force_disconnect()        // and/or async_force_disconnect () is called.        // During async operation, spep is valid but it has already been        // erased from sessions_        if (it == idx.end()) return true;        // The element of sessions_ must have longer lifetime        // than corresponding subscription.        // Because the subscription store the reference of the element.        optional<session_state_ref> ssr_opt;        // const_cast is appropriate here        // See https://github.com/boostorg/multi_index/issues/50        auto& ss = const_cast<session_state&>(*it);        ssr_opt.emplace(ss);        BOOST_ASSERT(ssr_opt);        session_state_ref ssr {ssr_opt.value()};        auto publish_proc =            [this, &ssr](retain_t const& r, qos qos_value, optional<std::size_t> sid) {                auto props = r.props;                if (sid) {                    props.push_back(v5::property::subscription_identifier(*sid));                }                if (r.tim_message_expiry) {                    auto d =                        std::chrono::duration_cast<std::chrono::seconds>(                            r.tim_message_expiry->expiry() - std::chrono::steady_clock::now()                        ).count();                    set_property<v5::property::message_expiry_interval>(                        props,                        v5::property::message_expiry_interval(                            static_cast<uint32_t>(d)                        )                    );                }                ssr.get().publish(                    timer_ioc_,                    r.topic,                    r.contents,                    std::min(r.qos_value, qos_value) | MQTT_NS::retain::yes,                    props                );            };        std::vector<std::function<void()>> retain_deliver;        retain_deliver.reserve(entries.size());        // subscription identifier        optional<std::size_t> sid;        // An in-order list of qos settings, used to send the reply.        // The MQTT protocol 3.1.1 - 3.8.4 Response - paragraph 6        // allows the server to grant a lower QOS than requested        // So we reply with the QOS setting that was granted        // not the one requested.        switch (ep.get_protocol_version()) {        case protocol_version::v3_1_1: {            std::vector<suback_return_code> res;            res.reserve(entries.size());            for (auto& e : entries) {                if (security.is_subscribe_authorized(ss.get_username(), e.topic_filter)) {                    res.emplace_back(qos_to_suback_return_code(e.subopts.get_qos())); // converts to granted_qos_x                    ssr.get().subscribe(                        force_move(e.share_name),                        e.topic_filter,                        e.subopts,                        [&] {                            std::shared_lock<mutex> g(mtx_retains_);                            retains_.find(                                e.topic_filter,                                [&](retain_t const& r) {                                    retain_deliver.emplace_back(                                        [&publish_proc, &r, qos_value = e.subopts.get_qos(), sid] {                                            publish_proc(r, qos_value, sid);                                        }                                    );                                }                            );                        }                    );                }                else {                    // User not authorized to subscribe to topic filter                    res.emplace_back(suback_return_code::failure);                }            }            // Acknowledge the subscriptions, and the registered QOS settings            ep.async_suback(                packet_id,                force_move(res),                [spep = force_move(spep)]                (error_code ec) {                    if (ec) {                        MQTT_LOG("mqtt_broker", info)                            << MQTT_ADD_VALUE(address, spep.get())                            << ec.message();                    }                }            );        } break;        case protocol_version::v5: {            // Get subscription identifier            auto v = get_property<v5::property::subscription_identifier>(props);            if (v && v.value().val() != 0) {                sid.emplace(v.value().val());            }            std::vector<v5::suback_reason_code> res;            res.reserve(entries.size());            for (auto& e : entries) {                if (security.is_subscribe_authorized(ss.get_username(), e.topic_filter)) {                    res.emplace_back(v5::qos_to_suback_reason_code(e.subopts.get_qos())); // converts to granted_qos_x                    ssr.get().subscribe(                        force_move(e.share_name),                        e.topic_filter,                        e.subopts,                        [&] {                            std::shared_lock<mutex> g(mtx_retains_);                            retains_.find(                                e.topic_filter,                                [&](retain_t const& r) {                                    retain_deliver.emplace_back(                                        [&publish_proc, &r, qos_value = e.subopts.get_qos(), sid] {                                            publish_proc(r, qos_value, sid);                                        }                                    );                                }                            );                        },                        sid                    );                }                else {                    // User not authorized to subscribe to topic filter                    res.emplace_back(v5::suback_reason_code::not_authorized);                }            }            if (h_subscribe_props_) h_subscribe_props_(props);            // Acknowledge the subscriptions, and the registered QOS settings            ep.async_suback(                packet_id,                force_move(res),                suback_props_,                [spep = force_move(spep)]                (error_code ec) {                    if (ec) {                        MQTT_LOG("mqtt_broker", info)                            << MQTT_ADD_VALUE(address, spep.get())                            << ec.message();                    }                }            );        } break;        default:            BOOST_ASSERT(false);            break;        }        for (auto const& f : retain_deliver) {            f();        }        return true;    }    bool unsubscribe_handler(        con_sp_t spep,        packet_id_t packet_id,        std::vector<unsubscribe_entry> entries,        v5::properties props) {        auto& ep = *spep;        std::shared_lock<mutex> g(mtx_sessions_);        auto& idx = sessions_.get<tag_con>();        auto it  = idx.find(spep);        // broker uses async_* APIs        // If broker erase a connection, then async_force_disconnect()        // and/or async_force_disconnect () is called.        // During async operation, spep is valid but it has already been        // erased from sessions_        if (it == idx.end()) return true;        // The element of sessions_ must have longer lifetime        // than corresponding subscription.        // Because the subscription store the reference of the element.        optional<session_state_ref> ssr_opt;        // const_cast is appropriate here        // See https://github.com/boostorg/multi_index/issues/50        auto& ss = const_cast<session_state&>(*it);        ssr_opt.emplace(ss);        BOOST_ASSERT(ssr_opt);        session_state_ref ssr {ssr_opt.value()};        // For each subscription that this connection has        // Compare against the list of topic filters, and remove        // the subscription if the topic filter is in the list.        for (auto const& e : entries) {            ssr.get().unsubscribe(e.share_name, e.topic_filter);        }        switch (ep.get_protocol_version()) {        case protocol_version::v3_1_1:            ep.async_unsuback(                packet_id,                [spep = force_move(spep)]                (error_code ec) {                    if (ec) {                        MQTT_LOG("mqtt_broker", info)                            << MQTT_ADD_VALUE(address, spep.get())                            << ec.message();                    }                }            );            break;        case protocol_version::v5:            if (h_unsubscribe_props_) h_unsubscribe_props_(props);            ep.async_unsuback(                packet_id,                std::vector<v5::unsuback_reason_code>(                    entries.size(),                    v5::unsuback_reason_code::success                ),                unsuback_props_,                [spep = force_move(spep)]                (error_code ec) {                    if (ec) {                        MQTT_LOG("mqtt_broker", info)                            << MQTT_ADD_VALUE(address, spep.get())                            << ec.message();                    }                }            );            break;        default:            BOOST_ASSERT(false);            break;        }        return true;    }    /**     * @brief do_publish Publish a message to any subscribed clients.     *     * @param source_ss - soource session_state.     * @param topic - The topic to publish the message on.     * @param contents - The contents of the message.     * @param pubopts - publish options     * @param props - properties     */    void do_publish(        session_state const& source_ss,        buffer topic,        buffer contents,        publish_options pubopts,        v5::properties props    ) {        // Get auth rights for this topic        // auth_users prepared once here, and then referred multiple times in subs_map_.modify() for efficiency        auto auth_users = security.auth_sub(topic);        // publish the message to subscribers.        // retain is delivered as the original only if rap_value is rap::retain.        // On MQTT v3.1.1, rap_value is always rap::dont.        auto deliver =            [&] (session_state& ss, subscription& sub, auto const& auth_users) {                // See if this session is authorized to subscribe this topic                auto access = security.auth_sub_user(auth_users, ss.get_username());                if (access != security::authorization::type::allow) return;                publish_options new_pubopts = std::min(pubopts.get_qos(), sub.subopts.get_qos());                if (sub.subopts.get_rap() == rap::retain && pubopts.get_retain() == MQTT_NS::retain::yes) {                    new_pubopts |= MQTT_NS::retain::yes;                }                if (sub.sid) {                    props.push_back(v5::property::subscription_identifier(sub.sid.value()));                    ss.deliver(                        timer_ioc_,                        topic,                        contents,                        new_pubopts,                        props                    );                    props.pop_back();                }                else {                    ss.deliver(                        timer_ioc_,                        topic,                        contents,                        new_pubopts,                        props                    );                }            };        //                  share_name   topic_filter        std::set<std::tuple<string_view, string_view>> sent;        {            std::shared_lock<mutex> g{mtx_subs_map_};            subs_map_.modify(                topic,                [&](buffer const& /*key*/, subscription& sub) {                    if (sub.share_name.empty()) {                        // Non shared subscriptions                        // If NL (no local) subscription option is set and                        // publisher is the same as subscriber, then skip it.                        if (sub.subopts.get_nl() == nl::yes &&                            sub.ss.get().client_id() ==  source_ss.client_id()) return;                        deliver(sub.ss.get(), sub, auth_users);                    }                    else {                        // Shared subscriptions                        bool inserted;                        std::tie(std::ignore, inserted) = sent.emplace(sub.share_name, sub.topic_filter);                        if (inserted) {                            if (auto ssr_opt = shared_targets_.get_target(sub.share_name, sub.topic_filter)) {                                deliver(ssr_opt.value().get(), sub, auth_users);                            }                        }                    }                }            );        }        optional<std::chrono::steady_clock::duration> message_expiry_interval;        if (source_ss.get_protocol_version() == protocol_version::v5) {            auto v = get_property<v5::property::message_expiry_interval>(props);            if (v) {                message_expiry_interval.emplace(std::chrono::seconds(v.value().val()));            }        }        /*         * If the message is marked as being retained, then we         * keep it in case a new subscription is added that matches         * this topic.         *         * @note: The MQTT standard 3.3.1.3 RETAIN makes it clear that         *        retained messages are global based on the topic, and         *        are not scoped by the client id. So any client may         *        publish a retained message on any topic, and the most         *        recently published retained message on a particular         *        topic is the message that is stored on the server.         *         * @note: The standard doesn't make it clear that publishing         *        a message with zero length, but the retain flag not         *        set, does not result in any existing retained message         *        being removed. However, internet searching indicates         *        that most brokers have opted to keep retained messages         *        when receiving contents of zero bytes, unless the so         *        received message has the retain flag set, in which case         *        the retained message is removed.         */        if (pubopts.get_retain() == MQTT_NS::retain::yes) {            if (contents.empty()) {                std::lock_guard<mutex> g(mtx_retains_);                retains_.erase(topic);            }            else {                std::shared_ptr<as::steady_timer> tim_message_expiry;                if (message_expiry_interval) {                    tim_message_expiry = std::make_shared<as::steady_timer>(timer_ioc_, message_expiry_interval.value());                    tim_message_expiry->async_wait(                        [this, topic = topic, wp = std::weak_ptr<as::steady_timer>(tim_message_expiry)]                        (boost::system::error_code const& ec) {                            if (auto sp = wp.lock()) {                                if (!ec) {                                    retains_.erase(topic);                                }                            }                        }                    );                }                std::lock_guard<mutex> g(mtx_retains_);                retains_.insert_or_assign(                    topic,                    retain_t {                        force_move(topic),                        force_move(contents),                        force_move(props),                        pubopts.get_qos(),                        tim_message_expiry                    }                );            }        }    }private:    as::io_context& timer_ioc_; ///< The boost asio context to run this broker on.    as::steady_timer tim_disconnect_; ///< Used to delay disconnect handling for testing    optional<std::chrono::steady_clock::duration> delay_disconnect_; ///< Used to delay disconnect handling for testing    // Authorization and authentication settings    broker::security security;    mutable mutex mtx_subs_map_;    sub_con_map subs_map_;   /// subscription information    shared_target shared_targets_; /// shared subscription targets    ///< Map of active client id and connections    /// session_state has references of subs_map_ and shared_targets_.    /// because session_state (member of sessions_) has references of subs_map_ and shared_targets_.    mutable mutex mtx_sessions_;    session_states sessions_;    mutable mutex mtx_retains_;    retained_messages retains_; ///< A list of messages retained so they can be sent to newly subscribed clients.    // MQTTv5 members    v5::properties connack_props_;    v5::properties suback_props_;    v5::properties unsuback_props_;    v5::properties puback_props_;    v5::properties pubrec_props_;    v5::properties pubrel_props_;    v5::properties pubcomp_props_;    std::function<void(v5::properties const&)> h_connect_props_;    std::function<void(v5::properties const&)> h_disconnect_props_;    std::function<void(v5::properties const&)> h_publish_props_;    std::function<void(v5::properties const&)> h_puback_props_;    std::function<void(v5::properties const&)> h_pubrec_props_;    std::function<void(v5::properties const&)> h_pubrel_props_;    std::function<void(v5::properties const&)> h_pubcomp_props_;    std::function<void(v5::properties const&)> h_subscribe_props_;    std::function<void(v5::properties const&)> h_unsubscribe_props_;    std::function<void(v5::properties const&)> h_auth_props_;    bool pingresp_ = true;    bool connack_ = true;};MQTT_BROKER_NS_END#endif // MQTT_BROKER_BROKER_HPP
 |