| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382 | // Copyright Takatoshi Kondo 2017//// Distributed under the Boost Software License, Version 1.0.// (See accompanying file LICENSE_1_0.txt or copy at// http://www.boost.org/LICENSE_1_0.txt)#if !defined(MQTT_SERVER_HPP)#define MQTT_SERVER_HPP#include <mqtt/variant.hpp> // should be top to configure variant limit#include <memory>#include <boost/asio.hpp>#include <mqtt/namespace.hpp>#include <mqtt/tcp_endpoint.hpp>#include <mqtt/endpoint.hpp>#include <mqtt/move.hpp>#include <mqtt/callable_overlay.hpp>#include <mqtt/strand.hpp>#include <mqtt/null_strand.hpp>namespace MQTT_NS {namespace as = boost::asio;template <typename Mutex, template<typename...> class LockGuard, std::size_t PacketIdBytes>class server_endpoint : public endpoint<Mutex, LockGuard, PacketIdBytes> {public:    using endpoint<Mutex, LockGuard, PacketIdBytes>::endpoint;protected:    void on_pre_send() noexcept override {}    void on_close() noexcept override {}    void on_error(error_code /*ec*/) noexcept override {}protected:    ~server_endpoint() = default;};template <    typename Strand = strand,    typename Mutex = std::mutex,    template<typename...> class LockGuard = std::lock_guard,    std::size_t PacketIdBytes = 2>class server {public:    using socket_t = tcp_endpoint<as::ip::tcp::socket, Strand>;    using endpoint_t = callable_overlay<server_endpoint<Mutex, LockGuard, PacketIdBytes>>;    /**     * @brief Accept handler     *        After this handler called, the next accept will automatically start.     * @param ep endpoint of the connecting client     */    using accept_handler = std::function<void(std::shared_ptr<endpoint_t> ep)>;    /**     * @brief Error handler during after accepted before connection established     *        After this handler called, the next accept will automatically start.     * @param ec error code     * @param ioc_con io_context for incoming connection     */    using connection_error_handler = std::function<void(error_code ec, as::io_context& ioc_con)>;    /**     * @brief Error handler for listen and accpet     *        After this handler called, the next accept won't  start     *        You need to call listen() again if you want to restart accepting.     * @param ec error code     */    using error_handler = std::function<void(error_code ec)>;    /**     * @brief Error handler for listen and accpet     *        After this handler called, the next accept won't  start     *        You need to call listen() again if you want to restart accepting.     * @param ec error code     * @param ioc_con io_context for listen or accept     */    using error_handler_with_ioc = std::function<void(error_code ec, as::io_context& ioc_accept)>;    template <typename AsioEndpoint, typename AcceptorConfig>    server(        AsioEndpoint&& ep,        as::io_context& ioc_accept,        as::io_context& ioc_con,        AcceptorConfig&& config)        : ep_(std::forward<AsioEndpoint>(ep)),          ioc_accept_(ioc_accept),          ioc_con_(&ioc_con),          ioc_con_getter_([this]() -> as::io_context& { return *ioc_con_; }),          acceptor_(as::ip::tcp::acceptor(ioc_accept_, ep_)),          config_(std::forward<AcceptorConfig>(config)) {        config_(acceptor_.value());    }    template <typename AsioEndpoint>    server(        AsioEndpoint&& ep,        as::io_context& ioc_accept,        as::io_context& ioc_con)        : server(std::forward<AsioEndpoint>(ep), ioc_accept, ioc_con, [](as::ip::tcp::acceptor&) {}) {}    template <typename AsioEndpoint, typename AcceptorConfig>    server(        AsioEndpoint&& ep,        as::io_context& ioc,        AcceptorConfig&& config)        : server(std::forward<AsioEndpoint>(ep), ioc, ioc, std::forward<AcceptorConfig>(config)) {}    template <typename AsioEndpoint>    server(        AsioEndpoint&& ep,        as::io_context& ioc)        : server(std::forward<AsioEndpoint>(ep), ioc, ioc, [](as::ip::tcp::acceptor&) {}) {}    template <typename AsioEndpoint, typename AcceptorConfig>    server(        AsioEndpoint&& ep,        as::io_context& ioc_accept,        std::function<as::io_context&()> ioc_con_getter,        AcceptorConfig&& config = [](as::ip::tcp::acceptor&) {})        : ep_(std::forward<AsioEndpoint>(ep)),          ioc_accept_(ioc_accept),          ioc_con_getter_(force_move(ioc_con_getter)),          acceptor_(as::ip::tcp::acceptor(ioc_accept_, ep_)),          config_(std::forward<AcceptorConfig>(config)) {        config_(acceptor_.value());    }    void listen() {        close_request_ = false;        if (!acceptor_) {            try {                acceptor_.emplace(ioc_accept_, ep_);                config_(acceptor_.value());            }            catch (boost::system::system_error const& e) {                as::post(                    ioc_accept_,                    [this, ec = e.code()] {                        if (h_error_) h_error_(ec, ioc_accept_);                    }                );                return;            }        }        do_accept();    }    unsigned short port() const { return acceptor_.value().local_endpoint().port(); }    void close() {        close_request_ = true;        as::post(            ioc_accept_,            [this] {                acceptor_.reset();            }        );    }    void set_accept_handler(accept_handler h = accept_handler()) {        h_accept_ = force_move(h);    }    /**     * @brief Set error handler for listen and accept     * @param h handler     */    void set_error_handler(error_handler h) {        h_error_ =            [h = force_move(h)]            (error_code ec, as::io_context&) {                if (h) h(ec);            };    }    /**     * @brief Set error handler for listen and accept     * @param h handler     */    void set_error_handler(error_handler_with_ioc h = error_handler_with_ioc()) {        h_error_ = force_move(h);    }    /**     * @brief Set error handler     * @param h handler     */    void set_connection_error_handler(connection_error_handler h = connection_error_handler()) {        h_connection_error_ = force_move(h);    }    /**     * @brief Set MQTT protocol version     * @param version accepting protocol version     * If the specific version is set, only set version is accepted.     * If the version is set to protocol_version::undetermined, all versions are accepted.     * Initial value is protocol_version::undetermined.     */    void set_protocol_version(protocol_version version) {        version_ = version;    }private:    void do_accept() {        if (close_request_) return;        auto& ioc_con = ioc_con_getter_();        auto socket = std::make_shared<socket_t>(ioc_con);        acceptor_.value().async_accept(            socket->lowest_layer(),            [this, socket, &ioc_con]            (error_code ec) mutable {                if (ec) {                    acceptor_.reset();                    if (h_error_) h_error_(ec, ioc_con);                    return;                }                auto sp = std::make_shared<endpoint_t>(ioc_con, force_move(socket), version_);                if (h_accept_) h_accept_(force_move(sp));                do_accept();            }        );    }private:    as::ip::tcp::endpoint ep_;    as::io_context& ioc_accept_;    as::io_context* ioc_con_ = nullptr;    std::function<as::io_context&()> ioc_con_getter_;    optional<as::ip::tcp::acceptor> acceptor_;    std::function<void(as::ip::tcp::acceptor&)> config_;    bool close_request_{false};    accept_handler h_accept_;    connection_error_handler h_connection_error_;    error_handler_with_ioc h_error_;    protocol_version version_ = protocol_version::undetermined;};#if defined(MQTT_USE_TLS)template <    typename Strand = strand,    typename Mutex = std::mutex,    template<typename...> class LockGuard = std::lock_guard,    std::size_t PacketIdBytes = 2>class server_tls {public:    using socket_t = tcp_endpoint<tls::stream<as::ip::tcp::socket>, Strand>;    using endpoint_t = callable_overlay<server_endpoint<Mutex, LockGuard, PacketIdBytes>>;    /**     * @brief Accept handler     *        After this handler called, the next accept will automatically start.     * @param ep endpoint of the connecting client     */    using accept_handler = std::function<void(std::shared_ptr<endpoint_t> ep)>;    /**     * @brief Error handler during after accepted before connection established     *        After this handler called, the next accept will automatically start.     * @param ec error code     * @param ioc_con io_context for incoming connection     */    using connection_error_handler = std::function<void(error_code ec, as::io_context& ioc_con)>;    /**     * @brief Error handler for listen and accpet     *        After this handler called, the next accept won't  start     *        You need to call listen() again if you want to restart accepting.     * @param ec error code     */    using error_handler = std::function<void(error_code ec)>;    /**     * @brief Error handler for listen and accpet     *        After this handler called, the next accept won't  start     *        You need to call listen() again if you want to restart accepting.     * @param ec error code     * @param ioc_con io_context for listen or accept     */    using error_handler_with_ioc = std::function<void(error_code ec, as::io_context& ioc_accept)>;    template <typename AsioEndpoint, typename AcceptorConfig>    server_tls(        AsioEndpoint&& ep,        tls::context&& ctx,        as::io_context& ioc_accept,        as::io_context& ioc_con,        AcceptorConfig&& config)        : ep_(std::forward<AsioEndpoint>(ep)),          ioc_accept_(ioc_accept),          ioc_con_(&ioc_con),          ioc_con_getter_([this]() -> as::io_context& { return *ioc_con_; }),          acceptor_(as::ip::tcp::acceptor(ioc_accept_, ep_)),          config_(std::forward<AcceptorConfig>(config)),          ctx_(force_move(ctx)) {        config_(acceptor_.value());    }    template <typename AsioEndpoint>    server_tls(        AsioEndpoint&& ep,        tls::context&& ctx,        as::io_context& ioc_accept,        as::io_context& ioc_con)        : server_tls(std::forward<AsioEndpoint>(ep), force_move(ctx), ioc_accept, ioc_con, [](as::ip::tcp::acceptor&) {}) {}    template <typename AsioEndpoint, typename AcceptorConfig>    server_tls(        AsioEndpoint&& ep,        tls::context&& ctx,        as::io_context& ioc,        AcceptorConfig&& config)        : server_tls(std::forward<AsioEndpoint>(ep), force_move(ctx), ioc, ioc, std::forward<AcceptorConfig>(config)) {}    template <typename AsioEndpoint>    server_tls(        AsioEndpoint&& ep,        tls::context&& ctx,        as::io_context& ioc)        : server_tls(std::forward<AsioEndpoint>(ep), force_move(ctx), ioc, ioc, [](as::ip::tcp::acceptor&) {}) {}    template <typename AsioEndpoint, typename AcceptorConfig>    server_tls(        AsioEndpoint&& ep,        tls::context&& ctx,        as::io_context& ioc_accept,        std::function<as::io_context&()> ioc_con_getter,        AcceptorConfig&& config = [](as::ip::tcp::acceptor&) {})        : ep_(std::forward<AsioEndpoint>(ep)),          ioc_accept_(ioc_accept),          ioc_con_getter_(force_move(ioc_con_getter)),          acceptor_(as::ip::tcp::acceptor(ioc_accept_, ep_)),          config_(std::forward<AcceptorConfig>(config)),          ctx_(force_move(ctx)) {        config_(acceptor_.value());    }    void listen() {        close_request_ = false;        if (!acceptor_) {            try {                acceptor_.emplace(ioc_accept_, ep_);                config_(acceptor_.value());            }            catch (boost::system::system_error const& e) {                as::post(                    ioc_accept_,                    [this, ec = e.code()] {                        if (h_error_) h_error_(ec, ioc_accept_);                    }                );                return;            }        }        do_accept();    }    unsigned short port() const { return acceptor_.value().local_endpoint().port(); }    void close() {        close_request_ = true;        as::post(            ioc_accept_,            [this] {                acceptor_.reset();            }        );    }    void set_accept_handler(accept_handler h = accept_handler()) {        h_accept_ = force_move(h);    }    /**     * @brief Set error handler for listen and accept     * @param h handler     */    void set_error_handler(error_handler h) {        h_error_ =            [h = force_move(h)]            (error_code ec, as::io_context&) {                if (h) h(ec);            };    }    /**     * @brief Set error handler for listen and accept     * @param h handler     */    void set_error_handler(error_handler_with_ioc h = error_handler_with_ioc()) {        h_error_ = force_move(h);    }    /**     * @brief Set error handler     * @param h handler     */    void set_connection_error_handler(connection_error_handler h = connection_error_handler()) {        h_connection_error_ = force_move(h);    }    /**     * @brief Set MQTT protocol version     * @param version accepting protocol version     * If the specific version is set, only set version is accepted.     * If the version is set to protocol_version::undetermined, all versions are accepted.     * Initial value is protocol_version::undetermined.     */    void set_protocol_version(protocol_version version) {        version_ = version;    }    /**     * @bried Set underlying layer connection timeout.     * The timer is set after TCP layer connection accepted.     * The timer is cancelled just before accept handler is called.     * If the timer is fired, the endpoint is removed, the socket is automatically closed.     * The default timeout value is 10 seconds.     * @param timeout timeout value     */    void set_underlying_connect_timeout(std::chrono::steady_clock::duration timeout) {        underlying_connect_timeout_ = force_move(timeout);    }    /**     * @brief Get boost asio ssl context.     * @return ssl context     */    tls::context& get_ssl_context() {        return ctx_;    }    /**     * @brief Get boost asio ssl context.     * @return ssl context     */    tls::context const& get_ssl_context() const {        return ctx_;    }    using verify_cb_t = std::function<bool (bool, boost::asio::ssl::verify_context&, std::shared_ptr<optional<std::string>> const&) >;    void set_verify_callback(verify_cb_t verify_cb) {        verify_cb_with_username_ = verify_cb;    }private:    void do_accept() {        if (close_request_) return;        auto& ioc_con = ioc_con_getter_();        auto socket = std::make_shared<socket_t>(ioc_con, ctx_);        auto ps = socket.get();        acceptor_.value().async_accept(            ps->lowest_layer(),            [this, socket = force_move(socket), &ioc_con]            (error_code ec) mutable {                if (ec) {                    acceptor_.reset();                    if (h_error_) h_error_(ec, ioc_con);                    return;                }                auto underlying_finished = std::make_shared<bool>(false);                auto connection_error_called = std::make_shared<bool>(false);                auto tim = std::make_shared<as::steady_timer>(ioc_con);                tim->expires_after(underlying_connect_timeout_);                tim->async_wait(                    [                        this,                        socket,                        tim,                        underlying_finished,                        connection_error_called,                        &ioc_con                    ]                    (error_code ec) {                        if (*underlying_finished) return;                        if (ec) return; // timer cancelled                        socket->post(                            [this, socket, connection_error_called, &ioc_con] {                                boost::system::error_code close_ec;                                socket->lowest_layer().close(close_ec);                                if (h_connection_error_ && !*connection_error_called) {                                    h_connection_error_(                                        boost::system::errc::make_error_code(                                            boost::system::errc::stream_timeout                                        ),                                        ioc_con                                    );                                    *connection_error_called = true;                                }                            }                        );                    }                );                auto ps = socket.get();                auto username = std::make_shared<optional<std::string>>(); // shared_ptr for username                auto verify_cb_ = [this, username] // copy capture socket shared_ptr                    (bool preverified, boost::asio::ssl::verify_context& ctx) {                        // user can set username in the callback                        return verify_cb_with_username_                            ? verify_cb_with_username_(preverified, ctx, username)                            : false;                };                ctx_.set_verify_mode(MQTT_NS::tls::verify_peer);                ctx_.set_verify_callback(verify_cb_);                ps->async_handshake(                    tls::stream_base::server,                    [                        this,                        socket = force_move(socket),                        tim,                        underlying_finished,                        connection_error_called,                        &ioc_con,                        username                    ]                    (error_code ec) mutable {                        *underlying_finished = true;                        tim->cancel();                        if (ec) {                            if (h_connection_error_ && !*connection_error_called) {                                h_connection_error_(ec, ioc_con);                                *connection_error_called = true;                            }                            return;                        }                        auto sp = std::make_shared<endpoint_t>(ioc_con, force_move(socket), version_);                        sp->set_preauthed_user_name(*username);                        if (h_accept_) h_accept_(force_move(sp));                    }                );                do_accept();            }        );    }private:    verify_cb_t verify_cb_with_username_;    as::ip::tcp::endpoint ep_;    as::io_context& ioc_accept_;    as::io_context* ioc_con_ = nullptr;    std::function<as::io_context&()> ioc_con_getter_;    optional<as::ip::tcp::acceptor> acceptor_;    std::function<void(as::ip::tcp::acceptor&)> config_;    bool close_request_{false};    accept_handler h_accept_;    connection_error_handler h_connection_error_;    error_handler_with_ioc h_error_;    tls::context ctx_;    protocol_version version_ = protocol_version::undetermined;    std::chrono::steady_clock::duration underlying_connect_timeout_ = std::chrono::seconds(10);};#endif // defined(MQTT_USE_TLS)#if defined(MQTT_USE_WS)template <    typename Strand = strand,    typename Mutex = std::mutex,    template<typename...> class LockGuard = std::lock_guard,    std::size_t PacketIdBytes = 2>class server_ws {public:    using socket_t = ws_endpoint<as::ip::tcp::socket, Strand>;    using endpoint_t = callable_overlay<server_endpoint<Mutex, LockGuard, PacketIdBytes>>;    /**     * @brief Accept handler     * @param ep endpoint of the connecting client     */    using accept_handler = std::function<void(std::shared_ptr<endpoint_t> ep)>;    /**     * @brief Error handler during after accepted before connection established     *        After this handler called, the next accept will automatically start.     * @param ec error code     * @param ioc_con io_context for incoming connection     */    using connection_error_handler = std::function<void(error_code ec, as::io_context& ioc_con)>;    /**     * @brief Error handler for listen and accpet     *        After this handler called, the next accept won't  start     *        You need to call listen() again if you want to restart accepting.     * @param ec error code     */    using error_handler = std::function<void(error_code ec)>;    /**     * @brief Error handler for listen and accpet     *        After this handler called, the next accept won't  start     *        You need to call listen() again if you want to restart accepting.     * @param ec error code     * @param ioc_con io_context for listen or accept     */    using error_handler_with_ioc = std::function<void(error_code ec, as::io_context& ioc_accept)>;    template <typename AsioEndpoint, typename AcceptorConfig>    server_ws(        AsioEndpoint&& ep,        as::io_context& ioc_accept,        as::io_context& ioc_con,        AcceptorConfig&& config)        : ep_(std::forward<AsioEndpoint>(ep)),          ioc_accept_(ioc_accept),          ioc_con_(&ioc_con),          ioc_con_getter_([this]() -> as::io_context& { return *ioc_con_; }),          acceptor_(as::ip::tcp::acceptor(ioc_accept_, ep_)),          config_(std::forward<AcceptorConfig>(config)) {        config_(acceptor_.value());    }    template <typename AsioEndpoint>    server_ws(        AsioEndpoint&& ep,        as::io_context& ioc_accept,        as::io_context& ioc_con)        : server_ws(std::forward<AsioEndpoint>(ep), ioc_accept, ioc_con, [](as::ip::tcp::acceptor&) {}) {}    template <typename AsioEndpoint, typename AcceptorConfig>    server_ws(        AsioEndpoint&& ep,        as::io_context& ioc,        AcceptorConfig&& config)        : server_ws(std::forward<AsioEndpoint>(ep), ioc, ioc, std::forward<AcceptorConfig>(config)) {}    template <typename AsioEndpoint>    server_ws(        AsioEndpoint&& ep,        as::io_context& ioc)        : server_ws(std::forward<AsioEndpoint>(ep), ioc, ioc, [](as::ip::tcp::acceptor&) {}) {}    template <typename AsioEndpoint, typename AcceptorConfig>    server_ws(        AsioEndpoint&& ep,        as::io_context& ioc_accept,        std::function<as::io_context&()> ioc_con_getter,        AcceptorConfig&& config = [](as::ip::tcp::acceptor&) {})        : ep_(std::forward<AsioEndpoint>(ep)),          ioc_accept_(ioc_accept),          ioc_con_getter_(force_move(ioc_con_getter)),          acceptor_(as::ip::tcp::acceptor(ioc_accept_, ep_)),          config_(std::forward<AcceptorConfig>(config)) {        config_(acceptor_.value());    }    void listen() {        close_request_ = false;        if (!acceptor_) {            try {                acceptor_.emplace(ioc_accept_, ep_);                config_(acceptor_.value());            }            catch (boost::system::system_error const& e) {                as::post(                    ioc_accept_,                    [this, ec = e.code()] {                        if (h_error_) h_error_(ec, ioc_accept_);                    }                );                return;            }        }        do_accept();    }    unsigned short port() const { return acceptor_.value().local_endpoint().port(); }    void close() {        close_request_ = true;        as::post(            ioc_accept_,            [this] {                acceptor_.reset();            }        );    }    void set_accept_handler(accept_handler h = accept_handler()) {        h_accept_ = force_move(h);    }    /**     * @brief Set error handler for listen and accept     * @param h handler     */    void set_error_handler(error_handler h) {        h_error_ =            [h = force_move(h)]            (error_code ec, as::io_context&) {                if (h) h(ec);            };    }    /**     * @brief Set error handler for listen and accept     * @param h handler     */    void set_error_handler(error_handler_with_ioc h = error_handler_with_ioc()) {        h_error_ = force_move(h);    }    /**     * @brief Set error handler     * @param h handler     */    void set_connection_error_handler(connection_error_handler h = connection_error_handler()) {        h_connection_error_ = force_move(h);    }    /**     * @brief Set MQTT protocol version     * @param version accepting protocol version     * If the specific version is set, only set version is accepted.     * If the version is set to protocol_version::undetermined, all versions are accepted.     * Initial value is protocol_version::undetermined.     */    void set_protocol_version(protocol_version version) {        version_ = version;    }    /**     * @bried Set underlying layer connection timeout.     * The timer is set after TCP layer connection accepted.     * The timer is cancelled just before accept handler is called.     * If the timer is fired, the endpoint is removed, the socket is automatically closed.     * The default timeout value is 10 seconds.     * @param timeout timeout value     */    void set_underlying_connect_timeout(std::chrono::steady_clock::duration timeout) {        underlying_connect_timeout_ = force_move(timeout);    }private:    void do_accept() {        if (close_request_) return;        auto& ioc_con = ioc_con_getter_();        auto socket = std::make_shared<socket_t>(ioc_con);        auto ps = socket.get();        acceptor_.value().async_accept(            ps->next_layer(),            [this, socket = force_move(socket), &ioc_con]            (error_code ec) mutable {                if (ec) {                    acceptor_.reset();                    if (h_error_) h_error_(ec, ioc_con);                    return;                }                auto underlying_finished = std::make_shared<bool>(false);                auto connection_error_called = std::make_shared<bool>(false);                auto tim = std::make_shared<as::steady_timer>(ioc_con);                tim->expires_after(underlying_connect_timeout_);                tim->async_wait(                    [                        this,                        socket,                        tim,                        underlying_finished,                        connection_error_called,                        &ioc_con                    ]                    (error_code ec) {                        if (*underlying_finished) return;                        if (ec) return; // timer cancelled                        socket->post(                            [this, socket, connection_error_called, &ioc_con] {                                boost::system::error_code close_ec;                                socket->lowest_layer().close(close_ec);                                if (h_connection_error_ && !*connection_error_called) {                                    h_connection_error_(                                        boost::system::errc::make_error_code(                                            boost::system::errc::stream_timeout                                        ),                                        ioc_con                                    );                                    *connection_error_called = true;                                }                            }                        );                    }                );                auto sb = std::make_shared<boost::asio::streambuf>();                auto request = std::make_shared<boost::beast::http::request<boost::beast::http::string_body>>();                auto ps = socket.get();                boost::beast::http::async_read(                    ps->next_layer(),                    *sb,                    *request,                    [                        this,                        socket = force_move(socket),                        sb,                        request,                        tim,                        underlying_finished,                        connection_error_called,                        &ioc_con                    ]                    (error_code ec, std::size_t) mutable {                        if (ec) {                            *underlying_finished = true;                            tim->cancel();                            if (h_connection_error_ && !*connection_error_called) {                                h_connection_error_(ec, ioc_con);                                *connection_error_called = true;                            }                            return;                        }                        if (!boost::beast::websocket::is_upgrade(*request)) {                            *underlying_finished = true;                            tim->cancel();                            if (h_connection_error_ && !*connection_error_called) {                                h_connection_error_(                                    boost::system::errc::make_error_code(                                        boost::system::errc::protocol_error                                    ),                                    ioc_con                                );                                *connection_error_called = true;                            }                            return;                        }                        auto ps = socket.get();#if BOOST_BEAST_VERSION >= 248                        auto it = request->find("Sec-WebSocket-Protocol");                        if (it != request->end()) {                            ps->set_option(                                boost::beast::websocket::stream_base::decorator(                                    [name = it->name(), value = it->value()] // name is enum, value is boost::string_view                                    (boost::beast::websocket::response_type& res) {                                        // This lambda is called before the scope out point *1                                        res.set(name, value);                                    }                                )                            );                        }                        ps->async_accept(                            *request,                            [                                this,                                socket = force_move(socket),                                tim,                                underlying_finished,                                connection_error_called,                                &ioc_con                            ]                            (error_code ec) mutable {                                *underlying_finished = true;                                tim->cancel();                                if (ec) {                                    if (h_connection_error_ && !*connection_error_called) {                                        h_connection_error_(ec, ioc_con);                                    }                                    *connection_error_called = true;                                    return;                                }                                auto sp = std::make_shared<endpoint_t>(ioc_con, force_move(socket), version_);                                if (h_accept_) h_accept_(force_move(sp));                            }                        );#else  // BOOST_BEAST_VERSION >= 248                        ps->async_accept_ex(                            *request,                            [request, connection_error_called]                            (boost::beast::websocket::response_type& m) {                                auto it = request->find("Sec-WebSocket-Protocol");                                if (it != request->end()) {                                    m.insert(it->name(), it->value());                                }                            },                            [this, socket = force_move(socket), tim, underlying_finished, &ioc_con]                            (error_code ec) mutable {                                *underlying_finished = true;                                tim->cancel();                                if (ec) {                                    if (h_connection_error_ && !*connection_error_called) {                                        h_connection_error_(ec, ioc_con);                                        *connection_error_called = true;                                    }                                    return;                                }                                auto sp = std::make_shared<endpoint_t>(ioc_con, force_move(socket), version_);                                if (h_accept_) h_accept_(force_move(sp));                            }                        );#endif // BOOST_BEAST_VERSION >= 248                        // scope out point *1                    }                );                do_accept();            }        );    }private:    as::ip::tcp::endpoint ep_;    as::io_context& ioc_accept_;    as::io_context* ioc_con_ = nullptr;    std::function<as::io_context&()> ioc_con_getter_;    optional<as::ip::tcp::acceptor> acceptor_;    std::function<void(as::ip::tcp::acceptor&)> config_;    bool close_request_{false};    accept_handler h_accept_;    connection_error_handler h_connection_error_;    error_handler_with_ioc h_error_;    protocol_version version_ = protocol_version::undetermined;    std::chrono::steady_clock::duration underlying_connect_timeout_ = std::chrono::seconds(10);};#if defined(MQTT_USE_TLS)template <    typename Strand = strand,    typename Mutex = std::mutex,    template<typename...> class LockGuard = std::lock_guard,    std::size_t PacketIdBytes = 2>class server_tls_ws {public:    using socket_t = ws_endpoint<tls::stream<as::ip::tcp::socket>, Strand>;    using endpoint_t = callable_overlay<server_endpoint<Mutex, LockGuard, PacketIdBytes>>;    /**     * @brief Accept handler     * @param ep endpoint of the connecting client     */    using accept_handler = std::function<void(std::shared_ptr<endpoint_t> ep)>;    /**     * @brief Error handler during after accepted before connection established     *        After this handler called, the next accept will automatically start.     * @param ec error code     * @param ioc_con io_context for incoming connection     */    using connection_error_handler = std::function<void(error_code ec, as::io_context& ioc_con)>;    /**     * @brief Error handler for listen and accpet     *        After this handler called, the next accept won't  start     *        You need to call listen() again if you want to restart accepting.     * @param ec error code     */    using error_handler = std::function<void(error_code ec)>;    /**     * @brief Error handler for listen and accpet     *        After this handler called, the next accept won't  start     *        You need to call listen() again if you want to restart accepting.     * @param ec error code     * @param ioc_con io_context for listen or accept     */    using error_handler_with_ioc = std::function<void(error_code ec, as::io_context& ioc_accept)>;    template <typename AsioEndpoint, typename AcceptorConfig>    server_tls_ws(        AsioEndpoint&& ep,        tls::context&& ctx,        as::io_context& ioc_accept,        as::io_context& ioc_con,        AcceptorConfig&& config)        : ep_(std::forward<AsioEndpoint>(ep)),          ioc_accept_(ioc_accept),          ioc_con_(&ioc_con),          ioc_con_getter_([this]() -> as::io_context& { return *ioc_con_; }),          acceptor_(as::ip::tcp::acceptor(ioc_accept_, ep_)),          config_(std::forward<AcceptorConfig>(config)),          ctx_(force_move(ctx)) {        config_(acceptor_.value());    }    template <typename AsioEndpoint>    server_tls_ws(        AsioEndpoint&& ep,        tls::context&& ctx,        as::io_context& ioc_accept,        as::io_context& ioc_con)        : server_tls_ws(std::forward<AsioEndpoint>(ep), force_move(ctx), ioc_accept, ioc_con, [](as::ip::tcp::acceptor&) {}) {}    template <typename AsioEndpoint, typename AcceptorConfig>    server_tls_ws(        AsioEndpoint&& ep,        tls::context&& ctx,        as::io_context& ioc,        AcceptorConfig&& config)        : server_tls_ws(std::forward<AsioEndpoint>(ep), force_move(ctx), ioc, ioc, std::forward<AcceptorConfig>(config)) {}    template <typename AsioEndpoint>    server_tls_ws(        AsioEndpoint&& ep,        tls::context&& ctx,        as::io_context& ioc)        : server_tls_ws(std::forward<AsioEndpoint>(ep), force_move(ctx), ioc, ioc, [](as::ip::tcp::acceptor&) {}) {}    template <typename AsioEndpoint, typename AcceptorConfig>    server_tls_ws(        AsioEndpoint&& ep,        tls::context&& ctx,        as::io_context& ioc_accept,        std::function<as::io_context&()> ioc_con_getter,        AcceptorConfig&& config = [](as::ip::tcp::acceptor&) {})        : ep_(std::forward<AsioEndpoint>(ep)),          ioc_accept_(ioc_accept),          ioc_con_getter_(force_move(ioc_con_getter)),          acceptor_(as::ip::tcp::acceptor(ioc_accept_, ep_)),          config_(std::forward<AcceptorConfig>(config)),          ctx_(force_move(ctx)) {        config_(acceptor_.value());    }    void listen() {        close_request_ = false;        if (!acceptor_) {            try {                acceptor_.emplace(ioc_accept_, ep_);                config_(acceptor_.value());            }            catch (boost::system::system_error const& e) {                as::post(                    ioc_accept_,                    [this, ec = e.code()] {                        if (h_error_) h_error_(ec, ioc_accept_);                    }                );                return;            }        }        do_accept();    }    unsigned short port() const { return acceptor_.value().local_endpoint().port(); }    void close() {        close_request_ = true;        as::post(            ioc_accept_,            [this] {                acceptor_.reset();            }        );    }    void set_accept_handler(accept_handler h = accept_handler()) {        h_accept_ = force_move(h);    }    /**     * @brief Set error handler for listen and accept     * @param h handler     */    void set_error_handler(error_handler h) {        h_error_ =            [h = force_move(h)]            (error_code ec, as::io_context&) {                if (h) h(ec);            };    }    /**     * @brief Set error handler for listen and accept     * @param h handler     */    void set_error_handler(error_handler_with_ioc h = error_handler_with_ioc()) {        h_error_ = force_move(h);    }    /**     * @brief Set error handler     * @param h handler     */    void set_connection_error_handler(connection_error_handler h = connection_error_handler()) {        h_connection_error_ = force_move(h);    }    /**     * @brief Set MQTT protocol version     * @param version accepting protocol version     * If the specific version is set, only set version is accepted.     * If the version is set to protocol_version::undetermined, all versions are accepted.     * Initial value is protocol_version::undetermined.     */    void set_protocol_version(protocol_version version) {        version_ = version;    }    /**     * @bried Set underlying layer connection timeout.     * The timer is set after TCP layer connection accepted.     * The timer is cancelled just before accept handler is called.     * If the timer is fired, the endpoint is removed, the socket is automatically closed.     * The default timeout value is 10 seconds.     * @param timeout timeout value     */    void set_underlying_connect_timeout(std::chrono::steady_clock::duration timeout) {        underlying_connect_timeout_ = force_move(timeout);    }    /**     * @brief Get boost asio ssl context.     * @return ssl context     */    tls::context& get_ssl_context() {        return ctx_;    }    /**     * @brief Get boost asio ssl context.     * @return ssl context     */    tls::context const& get_ssl_context() const {        return ctx_;    }    using verify_cb_t = std::function<bool (bool, boost::asio::ssl::verify_context&, std::shared_ptr<optional<std::string>> const&) >;    void set_verify_callback(verify_cb_t verify_cb) {        verify_cb_with_username_ = verify_cb;    }private:    void do_accept() {        if (close_request_) return;        auto& ioc_con = ioc_con_getter_();        auto socket = std::make_shared<socket_t>(ioc_con, ctx_);        auto ps = socket.get();        acceptor_.value().async_accept(            ps->next_layer().next_layer(),            [this, socket = force_move(socket), &ioc_con]            (error_code ec) mutable {                if (ec) {                    acceptor_.reset();                    if (h_error_) h_error_(ec, ioc_con);                    return;                }                auto underlying_finished = std::make_shared<bool>(false);                auto connection_error_called = std::make_shared<bool>(false);                auto tim = std::make_shared<as::steady_timer>(ioc_con);                tim->expires_after(underlying_connect_timeout_);                tim->async_wait(                    [                        this,                        socket,                        tim,                        underlying_finished,                        connection_error_called,                        &ioc_con                    ]                    (error_code ec) {                        if (*underlying_finished) return;                        if (ec) return; // timer cancelled                        socket->post(                            [this, socket, connection_error_called, &ioc_con] {                                boost::system::error_code close_ec;                                socket->lowest_layer().close(close_ec);                                if (h_connection_error_ && !*connection_error_called) {                                    h_connection_error_(                                        boost::system::errc::make_error_code(                                            boost::system::errc::stream_timeout                                        ),                                        ioc_con                                    );                                    *connection_error_called = true;                                }                            }                        );                    }                );                auto ps = socket.get();                auto username = std::make_shared<optional<std::string>>(); // shared_ptr for username                auto verify_cb_ = [this, username] // copy capture socket shared_ptr                    (bool preverified, boost::asio::ssl::verify_context& ctx) {                        // user can set username in the callback                        return verify_cb_with_username_                            ? verify_cb_with_username_(preverified, ctx, username)                            : false;                };                ctx_.set_verify_mode(MQTT_NS::tls::verify_peer);                ctx_.set_verify_callback(verify_cb_);                ps->next_layer().async_handshake(                    tls::stream_base::server,                    [                        this,                        socket = force_move(socket),                        tim,                        underlying_finished,                        connection_error_called,                        &ioc_con,                        username                    ]                    (error_code ec) mutable {                        if (ec) {                            *underlying_finished = true;                            tim->cancel();                            return;                        }                        auto sb = std::make_shared<boost::asio::streambuf>();                        auto request = std::make_shared<boost::beast::http::request<boost::beast::http::string_body>>();                        auto ps = socket.get();                        boost::beast::http::async_read(                            ps->next_layer(),                            *sb,                            *request,                            [                                this,                                socket = force_move(socket),                                sb,                                request,                                tim,                                underlying_finished,                                connection_error_called,                                &ioc_con,                                username                            ]                            (error_code ec, std::size_t) mutable {                                if (ec) {                                    *underlying_finished = true;                                    tim->cancel();                                    if (h_connection_error_ && !*connection_error_called) {                                        h_connection_error_(ec, ioc_con);                                        *connection_error_called = true;                                    }                                    return;                                }                                if (!boost::beast::websocket::is_upgrade(*request)) {                                    *underlying_finished = true;                                    tim->cancel();                                    if (h_connection_error_ && !*connection_error_called) {                                        h_connection_error_(                                            boost::system::errc::make_error_code(                                                boost::system::errc::protocol_error                                            ),                                            ioc_con                                        );                                        *connection_error_called = true;                                    }                                    return;                                }                                auto ps = socket.get();#if BOOST_BEAST_VERSION >= 248                                auto it = request->find("Sec-WebSocket-Protocol");                                if (it != request->end()) {                                    ps->set_option(                                        boost::beast::websocket::stream_base::decorator(                                            [name = it->name(), value = it->value()] // name is enum, value is boost::string_view                                            (boost::beast::websocket::response_type& res) {                                                // This lambda is called before the scope out point *1                                                res.set(name, value);                                            }                                        )                                    );                                }                                ps->async_accept(                                    *request,                                    [                                        this,                                        socket = force_move(socket),                                        tim,                                        underlying_finished,                                        connection_error_called,                                        &ioc_con,                                        username                                    ]                                    (error_code ec) mutable {                                        *underlying_finished = true;                                        tim->cancel();                                        if (ec) {                                            if (h_connection_error_ && !*connection_error_called) {                                                h_connection_error_(ec, ioc_con);                                                *connection_error_called = true;                                            }                                            return;                                        }                                        auto sp = std::make_shared<endpoint_t>(ioc_con, force_move(socket), version_);                                        sp->set_preauthed_user_name(*username);                                        if (h_accept_) h_accept_(force_move(sp));                                    }                                );#else  // BOOST_BEAST_VERSION >= 248                                ps->async_accept_ex(                                    *request,                                    [request]                                    (boost::beast::websocket::response_type& m) {                                        auto it = request->find("Sec-WebSocket-Protocol");                                        if (it != request->end()) {                                            m.insert(it->name(), it->value());                                        }                                    },                                    [                                        this,                                        socket = force_move(socket),                                        tim,                                        underlying_finished,                                        connection_error_called,                                        &ioc_con,                                        username                                    ]                                    (error_code ec) mutable {                                        *underlying_finished = true;                                        tim->cancel();                                        if (ec) {                                            if (h_connection_error_ && *connection_error_called) {                                                h_connection_error_(ec, ioc_con);                                                *connection_error_called = true;                                            }                                            return;                                        }                                        // TODO: The use of force_move on this line of code causes                                        // a static assertion that socket is a const object when                                        // TLS is enabled, and WS is enabled, with Boost 1.70, and gcc 8.3.0                                        auto sp = std::make_shared<endpoint_t>(ioc_con, socket, version_);                                        sp->set_preauthed_user_name(*username);                                        if (h_accept_) h_accept_(force_move(sp));                                    }                                );#endif // BOOST_BEAST_VERSION >= 248                                // scope out point *1                            }                        );                    }                );                do_accept();            }        );    }private:    verify_cb_t verify_cb_with_username_;    as::ip::tcp::endpoint ep_;    as::io_context& ioc_accept_;    as::io_context* ioc_con_ = nullptr;    std::function<as::io_context&()> ioc_con_getter_;    optional<as::ip::tcp::acceptor> acceptor_;    std::function<void(as::ip::tcp::acceptor&)> config_;    bool close_request_{false};    accept_handler h_accept_;    connection_error_handler h_connection_error_;    error_handler_with_ioc h_error_;    tls::context ctx_;    protocol_version version_ = protocol_version::undetermined;    std::chrono::steady_clock::duration underlying_connect_timeout_ = std::chrono::seconds(10);};#endif // defined(MQTT_USE_TLS)#endif // defined(MQTT_USE_WS)} // namespace MQTT_NS#endif // MQTT_SERVER_HPP
 |