tcp_endpoint.hpp 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. // Copyright Takatoshi Kondo 2017
  2. //
  3. // Distributed under the Boost Software License, Version 1.0.
  4. // (See accompanying file LICENSE_1_0.txt or copy at
  5. // http://www.boost.org/LICENSE_1_0.txt)
  6. #if !defined(MQTT_TCP_ENDPOINT_HPP)
  7. #define MQTT_TCP_ENDPOINT_HPP
  8. #include <boost/asio.hpp>
  9. #include <boost/asio/bind_executor.hpp>
  10. #include <mqtt/namespace.hpp>
  11. #include <mqtt/type_erased_socket.hpp>
  12. #include <mqtt/move.hpp>
  13. #include <mqtt/attributes.hpp>
  14. #include <mqtt/tls.hpp>
  15. #include <mqtt/log.hpp>
  16. namespace MQTT_NS {
  17. namespace as = boost::asio;
  18. template <typename Socket, typename Strand>
  19. class tcp_endpoint : public socket {
  20. public:
  21. template <typename... Args>
  22. explicit tcp_endpoint(as::io_context& ioc, Args&&... args)
  23. :tcp_(ioc, std::forward<Args>(args)...),
  24. strand_(ioc.get_executor())
  25. {}
  26. MQTT_ALWAYS_INLINE void async_read(
  27. as::mutable_buffer buffers,
  28. std::function<void(error_code, std::size_t)> handler
  29. ) override final {
  30. as::async_read(
  31. tcp_,
  32. force_move(buffers),
  33. as::bind_executor(
  34. strand_,
  35. force_move(handler)
  36. )
  37. );
  38. }
  39. MQTT_ALWAYS_INLINE void async_write(
  40. std::vector<as::const_buffer> buffers,
  41. std::function<void(error_code, std::size_t)> handler
  42. ) override final {
  43. as::async_write(
  44. tcp_,
  45. force_move(buffers),
  46. as::bind_executor(
  47. strand_,
  48. force_move(handler)
  49. )
  50. );
  51. }
  52. MQTT_ALWAYS_INLINE std::size_t write(
  53. std::vector<as::const_buffer> buffers,
  54. boost::system::error_code& ec
  55. ) override final {
  56. return as::write(tcp_,force_move(buffers), ec);
  57. }
  58. MQTT_ALWAYS_INLINE void post(std::function<void()> handler) override final {
  59. as::post(
  60. strand_,
  61. force_move(handler)
  62. );
  63. }
  64. MQTT_ALWAYS_INLINE void dispatch(std::function<void()> handler) override final {
  65. as::dispatch(
  66. strand_,
  67. force_move(handler)
  68. );
  69. }
  70. MQTT_ALWAYS_INLINE void defer(std::function<void()> handler) override final {
  71. as::defer(
  72. strand_,
  73. force_move(handler)
  74. );
  75. }
  76. MQTT_ALWAYS_INLINE bool running_in_this_thread() const override final {
  77. return strand_.running_in_this_thread();
  78. }
  79. MQTT_ALWAYS_INLINE as::ip::tcp::socket::lowest_layer_type& lowest_layer() override final {
  80. return tcp_.lowest_layer();
  81. }
  82. MQTT_ALWAYS_INLINE any native_handle() override final {
  83. return tcp_.native_handle();
  84. }
  85. MQTT_ALWAYS_INLINE void clean_shutdown_and_close(boost::system::error_code& ec) override final {
  86. shutdown_and_close_impl(tcp_, ec);
  87. }
  88. MQTT_ALWAYS_INLINE void async_clean_shutdown_and_close(std::function<void(error_code)> handler) override final {
  89. async_shutdown_and_close_impl(tcp_, force_move(handler));
  90. }
  91. MQTT_ALWAYS_INLINE void force_shutdown_and_close(boost::system::error_code& ec) override final {
  92. tcp_.lowest_layer().shutdown(as::ip::tcp::socket::shutdown_both, ec);
  93. tcp_.lowest_layer().close(ec);
  94. }
  95. MQTT_ALWAYS_INLINE as::any_io_executor get_executor() override final {
  96. return strand_;
  97. }
  98. auto& socket() { return tcp_; }
  99. auto const& socket() const { return tcp_; }
  100. template <typename... Args>
  101. void set_option(Args&& ... args) {
  102. tcp_.set_option(std::forward<Args>(args)...);
  103. }
  104. template <typename... Args>
  105. void async_accept(Args&& ... args) {
  106. tcp_.async_accept(std::forward<Args>(args)...);
  107. }
  108. #if defined(MQTT_USE_TLS)
  109. template <typename... Args>
  110. void handshake(Args&& ... args) {
  111. tcp_.handshake(std::forward<Args>(args)...);
  112. }
  113. template <typename... Args>
  114. void async_handshake(Args&& ... args) {
  115. tcp_.async_handshake(std::forward<Args>(args)...);
  116. }
  117. #endif // defined(MQTT_USE_TLS)
  118. private:
  119. void shutdown_and_close_impl(as::basic_socket<boost::asio::ip::tcp>& s, boost::system::error_code& ec) {
  120. s.shutdown(as::ip::tcp::socket::shutdown_both, ec);
  121. MQTT_LOG("mqtt_impl", trace)
  122. << MQTT_ADD_VALUE(address, this)
  123. << "shutdown ec:"
  124. << ec.message();
  125. s.close(ec);
  126. MQTT_LOG("mqtt_impl", trace)
  127. << MQTT_ADD_VALUE(address, this)
  128. << "close ec:"
  129. << ec.message();
  130. }
  131. void async_shutdown_and_close_impl(as::basic_socket<boost::asio::ip::tcp>& s, std::function<void(error_code)> handler) {
  132. post(
  133. [this, &s, handler = force_move(handler)] () mutable {
  134. error_code ec;
  135. shutdown_and_close_impl(s, ec);
  136. force_move(handler)(ec);
  137. }
  138. );
  139. }
  140. #if defined(MQTT_USE_TLS)
  141. void shutdown_and_close_impl(tls::stream<as::ip::tcp::socket>& s, boost::system::error_code& ec) {
  142. s.shutdown(ec);
  143. MQTT_LOG("mqtt_impl", trace)
  144. << MQTT_ADD_VALUE(address, this)
  145. << "shutdown ec:"
  146. << ec.message();
  147. shutdown_and_close_impl(lowest_layer(), ec);
  148. }
  149. void async_shutdown_and_close_impl(tls::stream<as::ip::tcp::socket>& s, std::function<void(error_code)> handler) {
  150. s.async_shutdown(
  151. as::bind_executor(
  152. strand_,
  153. [this, &s, handler = force_move(handler)] (error_code ec) mutable {
  154. MQTT_LOG("mqtt_impl", trace)
  155. << MQTT_ADD_VALUE(address, this)
  156. << "shutdown ec:"
  157. << ec.message();
  158. shutdown_and_close_impl(s.lowest_layer(), ec);
  159. force_move(handler)(ec);
  160. }
  161. )
  162. );
  163. }
  164. #endif // defined(MQTT_USE_TLS)
  165. private:
  166. Socket tcp_;
  167. Strand strand_;
  168. };
  169. } // namespace MQTT_NS
  170. #endif // MQTT_TCP_ENDPOINT_HPP