ws_endpoint.hpp 12 KB


  1. // Copyright Takatoshi Kondo 2016
  2. //
  3. // Distributed under the Boost Software License, Version 1.0.
  4. // (See accompanying file LICENSE_1_0.txt or copy at
  5. // http://www.boost.org/LICENSE_1_0.txt)
  6. #if !defined(MQTT_WS_ENDPOINT_HPP)
  7. #define MQTT_WS_ENDPOINT_HPP
  8. #include <boost/beast/websocket.hpp>
  9. #include <boost/beast/core/flat_buffer.hpp>
  10. #include <boost/asio/bind_executor.hpp>
  11. #include <mqtt/namespace.hpp>
  12. #include <mqtt/type_erased_socket.hpp>
  13. #include <mqtt/move.hpp>
  14. #include <mqtt/attributes.hpp>
  15. #include <mqtt/string_view.hpp>
  16. #include <mqtt/error_code.hpp>
  17. #include <mqtt/tls.hpp>
  18. #include <mqtt/log.hpp>
  19. namespace MQTT_NS {
  20. namespace as = boost::asio;
  21. template <typename Socket, typename Strand>
  22. class ws_endpoint : public socket {
  23. public:
  24. template <typename... Args>
  25. explicit ws_endpoint(as::io_context& ioc, Args&&... args)
  26. :ws_(ioc, std::forward<Args>(args)...),
  27. strand_(ioc.get_executor())
  28. {
  29. ws_.binary(true);
  30. ws_.set_option(
  31. boost::beast::websocket::stream_base::decorator(
  32. [](boost::beast::websocket::request_type& req) {
  33. req.set("Sec-WebSocket-Protocol", "mqtt");
  34. }
  35. )
  36. );
  37. }
  38. MQTT_ALWAYS_INLINE void async_read(
  39. as::mutable_buffer buffers,
  40. std::function<void(error_code, std::size_t)> handler
  41. ) override final {
  42. auto req_size = as::buffer_size(buffers);
  43. using beast_read_handler_t =
  44. std::function<void(error_code ec, std::shared_ptr<void>)>;
  45. std::shared_ptr<beast_read_handler_t> beast_read_handler;
  46. if (req_size <= buffer_.size()) {
  47. as::buffer_copy(buffers, buffer_.data(), req_size);
  48. buffer_.consume(req_size);
  49. handler(boost::system::errc::make_error_code(boost::system::errc::success), req_size);
  50. return;
  51. }
  52. beast_read_handler.reset(
  53. new beast_read_handler_t(
  54. [this, req_size, buffers, handler = force_move(handler)]
  55. (error_code ec, std::shared_ptr<void> const& v) mutable {
  56. if (ec) {
  57. force_move(handler)(ec, 0);
  58. return;
  59. }
  60. if (!ws_.got_binary()) {
  61. buffer_.consume(buffer_.size());
  62. force_move(handler)
  63. (boost::system::errc::make_error_code(boost::system::errc::bad_message), 0);
  64. return;
  65. }
  66. if (req_size > buffer_.size()) {
  67. auto beast_read_handler = std::static_pointer_cast<beast_read_handler_t>(v);
  68. ws_.async_read(
  69. buffer_,
  70. as::bind_executor(
  71. strand_,
  72. [beast_read_handler]
  73. (error_code ec, std::size_t) {
  74. (*beast_read_handler)(ec, beast_read_handler);
  75. }
  76. )
  77. );
  78. return;
  79. }
  80. as::buffer_copy(buffers, buffer_.data(), req_size);
  81. buffer_.consume(req_size);
  82. force_move(handler)(boost::system::errc::make_error_code(boost::system::errc::success), req_size);
  83. }
  84. )
  85. );
  86. ws_.async_read(
  87. buffer_,
  88. as::bind_executor(
  89. strand_,
  90. [beast_read_handler]
  91. (error_code ec, std::size_t) {
  92. (*beast_read_handler)(ec, beast_read_handler);
  93. }
  94. )
  95. );
  96. }
  97. MQTT_ALWAYS_INLINE void async_write(
  98. std::vector<as::const_buffer> buffers,
  99. std::function<void(error_code, std::size_t)> handler
  100. ) override final {
  101. ws_.async_write(
  102. buffers,
  103. as::bind_executor(
  104. strand_,
  105. force_move(handler)
  106. )
  107. );
  108. }
  109. MQTT_ALWAYS_INLINE std::size_t write(
  110. std::vector<as::const_buffer> buffers,
  111. boost::system::error_code& ec
  112. ) override final {
  113. ws_.write(buffers, ec);
  114. return as::buffer_size(buffers);
  115. }
  116. MQTT_ALWAYS_INLINE void post(std::function<void()> handler) override final {
  117. as::post(
  118. strand_,
  119. force_move(handler)
  120. );
  121. }
  122. MQTT_ALWAYS_INLINE void dispatch(std::function<void()> handler) override final {
  123. as::dispatch(
  124. strand_,
  125. force_move(handler)
  126. );
  127. }
  128. MQTT_ALWAYS_INLINE void defer(std::function<void()> handler) override final {
  129. as::defer(
  130. strand_,
  131. force_move(handler)
  132. );
  133. }
  134. MQTT_ALWAYS_INLINE bool running_in_this_thread() const override final {
  135. return strand_.running_in_this_thread();
  136. }
  137. MQTT_ALWAYS_INLINE as::ip::tcp::socket::lowest_layer_type& lowest_layer() override final {
  138. return boost::beast::get_lowest_layer(ws_);
  139. }
  140. MQTT_ALWAYS_INLINE any native_handle() override final {
  141. return next_layer().native_handle();
  142. }
  143. MQTT_ALWAYS_INLINE void clean_shutdown_and_close(boost::system::error_code& ec) override final {
  144. if (ws_.is_open()) {
  145. // WebSocket closing process
  146. MQTT_LOG("mqtt_impl", trace)
  147. << MQTT_ADD_VALUE(address, this)
  148. << "call beast close";
  149. ws_.close(boost::beast::websocket::close_code::normal, ec);
  150. MQTT_LOG("mqtt_impl", trace)
  151. << MQTT_ADD_VALUE(address, this)
  152. << "ws close ec:"
  153. << ec.message();
  154. if (!ec) {
  155. do {
  156. boost::beast::flat_buffer buffer;
  157. ws_.read(buffer, ec);
  158. } while (!ec);
  159. if (ec == boost::beast::websocket::error::closed) {
  160. ec = boost::system::errc::make_error_code(boost::system::errc::success);
  161. }
  162. MQTT_LOG("mqtt_impl", trace)
  163. << MQTT_ADD_VALUE(address, this)
  164. << "ws read ec:"
  165. << ec.message();
  166. }
  167. }
  168. shutdown_and_close_impl(next_layer(), ec);
  169. }
  170. MQTT_ALWAYS_INLINE void async_clean_shutdown_and_close(std::function<void(error_code)> handler) override final {
  171. if (ws_.is_open()) {
  172. // WebSocket closing process
  173. MQTT_LOG("mqtt_impl", trace)
  174. << MQTT_ADD_VALUE(address, this)
  175. << "call beast async_close";
  176. ws_.async_close(
  177. boost::beast::websocket::close_code::normal,
  178. as::bind_executor(
  179. strand_,
  180. [this, handler = force_move(handler)]
  181. (error_code ec) mutable {
  182. if (ec) {
  183. MQTT_LOG("mqtt_impl", trace)
  184. << MQTT_ADD_VALUE(address, this)
  185. << "ws async_close ec:"
  186. << ec.message();
  187. async_shutdown_and_close_impl(next_layer(), force_move(handler));
  188. }
  189. else {
  190. async_read_until_closed(force_move(handler));
  191. }
  192. }
  193. )
  194. );
  195. }
  196. else {
  197. MQTT_LOG("mqtt_impl", trace)
  198. << MQTT_ADD_VALUE(address, this)
  199. << "ws async_close already closed";
  200. async_shutdown_and_close_impl(next_layer(), force_move(handler));
  201. }
  202. }
  203. MQTT_ALWAYS_INLINE void force_shutdown_and_close(boost::system::error_code& ec) override final {
  204. lowest_layer().shutdown(as::ip::tcp::socket::shutdown_both, ec);
  205. lowest_layer().close(ec);
  206. }
  207. MQTT_ALWAYS_INLINE as::any_io_executor get_executor() override final {
  208. return strand_;
  209. }
  210. typename boost::beast::websocket::stream<Socket>::next_layer_type& next_layer() {
  211. return ws_.next_layer();
  212. }
  213. template <typename T>
  214. void set_option(T&& t) {
  215. ws_.set_option(std::forward<T>(t));
  216. }
  217. template <typename ConstBufferSequence, typename AcceptHandler>
  218. void async_accept(
  219. ConstBufferSequence const& buffers,
  220. AcceptHandler&& handler) {
  221. ws_.async_accept(buffers, std::forward<AcceptHandler>(handler));
  222. }
  223. template<typename ConstBufferSequence, typename ResponseDecorator, typename AcceptHandler>
  224. void async_accept_ex(
  225. ConstBufferSequence const& buffers,
  226. ResponseDecorator const& decorator,
  227. AcceptHandler&& handler) {
  228. ws_.async_accept_ex(buffers, decorator, std::forward<AcceptHandler>(handler));
  229. }
  230. template <typename... Args>
  231. void async_handshake(Args&& ... args) {
  232. ws_.async_handshake(std::forward<Args>(args)...);
  233. }
  234. template <typename... Args>
  235. void handshake(Args&& ... args) {
  236. ws_.handshake(std::forward<Args>(args)...);
  237. }
  238. template <typename ConstBufferSequence>
  239. std::size_t write(
  240. ConstBufferSequence const& buffers) {
  241. ws_.write(buffers);
  242. return as::buffer_size(buffers);
  243. }
  244. private:
  245. void async_read_until_closed(std::function<void(error_code)> handler) {
  246. auto buffer = std::make_shared<boost::beast::flat_buffer>();
  247. ws_.async_read(
  248. *buffer,
  249. as::bind_executor(
  250. strand_,
  251. [this, handler = force_move(handler)]
  252. (error_code ec, std::size_t) mutable {
  253. if (ec) {
  254. if (ec == boost::beast::websocket::error::closed) {
  255. ec = boost::system::errc::make_error_code(boost::system::errc::success);
  256. }
  257. MQTT_LOG("mqtt_impl", trace)
  258. << MQTT_ADD_VALUE(address, this)
  259. << "ws async_read ec:"
  260. << ec.message();
  261. async_shutdown_and_close_impl(next_layer(), force_move(handler));
  262. }
  263. else {
  264. async_read_until_closed(force_move(handler));
  265. }
  266. }
  267. )
  268. );
  269. }
  270. void shutdown_and_close_impl(as::basic_socket<boost::asio::ip::tcp>& s, boost::system::error_code& ec) {
  271. s.shutdown(as::ip::tcp::socket::shutdown_both, ec);
  272. MQTT_LOG("mqtt_impl", trace)
  273. << MQTT_ADD_VALUE(address, this)
  274. << "shutdown ec:"
  275. << ec.message();
  276. s.close(ec);
  277. MQTT_LOG("mqtt_impl", trace)
  278. << MQTT_ADD_VALUE(address, this)
  279. << "close ec:"
  280. << ec.message();
  281. }
  282. void async_shutdown_and_close_impl(as::basic_socket<boost::asio::ip::tcp>& s, std::function<void(error_code)> handler) {
  283. post(
  284. [this, &s, handler = force_move(handler)] () mutable {
  285. error_code ec;
  286. shutdown_and_close_impl(s, ec);
  287. force_move(handler)(ec);
  288. }
  289. );
  290. }
  291. #if defined(MQTT_USE_TLS)
  292. void shutdown_and_close_impl(tls::stream<as::ip::tcp::socket>& s, boost::system::error_code& ec) {
  293. s.shutdown(ec);
  294. MQTT_LOG("mqtt_impl", trace)
  295. << MQTT_ADD_VALUE(address, this)
  296. << "shutdown ec:"
  297. << ec.message();
  298. shutdown_and_close_impl(lowest_layer(), ec);
  299. }
  300. void async_shutdown_and_close_impl(tls::stream<as::ip::tcp::socket>& s, std::function<void(error_code)> handler) {
  301. s.async_shutdown(
  302. as::bind_executor(
  303. strand_,
  304. [this, handler = force_move(handler)] (error_code ec) mutable {
  305. MQTT_LOG("mqtt_impl", trace)
  306. << MQTT_ADD_VALUE(address, this)
  307. << "shutdown ec:"
  308. << ec.message();
  309. shutdown_and_close_impl(lowest_layer(), ec);
  310. force_move(handler)(ec);
  311. }
  312. )
  313. );
  314. }
  315. #endif // defined(MQTT_USE_TLS)
  316. private:
  317. boost::beast::websocket::stream<Socket> ws_;
  318. boost::beast::flat_buffer buffer_;
  319. Strand strand_;
  320. };
  321. } // namespace MQTT_NS
  322. #endif // MQTT_WS_ENDPOINT_HPP