Last active
February 21, 2019 07:13
-
-
Save HungMingWu/229ee0657b9ea706a32ceec8bc8f965a to your computer and use it in GitHub Desktop.
Write my own custom asio async function
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <boost/system/error_code.hpp> | |
#include <boost/asio/async_result.hpp> | |
#include <boost/asio/detail/handler_type_requirements.hpp> | |
#include <boost/asio/coroutine.hpp> | |
#include <boost/asio/ip/tcp.hpp> | |
#include <boost/asio/streambuf.hpp> | |
#include <boost/optional.hpp> | |
#include <boost/asio/io_context_strand.hpp> | |
#include <boost/asio/bind_executor.hpp> | |
struct Frame1 { | |
std::string body; | |
size_t frame_length() const { | |
return 4 + body.length(); | |
} | |
static boost::optional<Frame1> parse(const std::vector<uint8_t> &input) | |
{ | |
if (input.size() < 4) | |
return boost::none; | |
size_t frame_len = *(uint32_t *)(&input[0]); | |
if (input.size() < frame_len) | |
return boost::none; | |
std::string body((char *)&input[4], frame_len - 4); | |
return Frame1{ std::move(body) }; | |
} | |
}; | |
namespace detail { | |
template <class AsyncReadStream, class DynamicBuffer, class Frame, class ReadHandler> | |
class read_frame_op : public boost::asio::coroutine { | |
AsyncReadStream &s_; | |
DynamicBuffer &b_; | |
Frame &f_; | |
ReadHandler handler_; | |
public: | |
read_frame_op(AsyncReadStream &s, DynamicBuffer &b, Frame &f, ReadHandler&& handler) | |
: s_(s), | |
b_(b), | |
f_(f), | |
handler_(BOOST_ASIO_MOVE_CAST(ReadHandler)(handler)) | |
{ | |
} | |
void operator()( | |
boost::system::error_code ec, | |
std::size_t bytes_transferred = 0) | |
{ | |
BOOST_ASIO_CORO_REENTER(*this) | |
{ | |
for (;;) | |
{ | |
BOOST_ASIO_CORO_YIELD | |
s_.async_read_some(b_.prepare(65536), std::move(*this)); | |
if (ec) { | |
handler_(ec); | |
break; | |
} | |
b_.commit(bytes_transferred); | |
while (true) { | |
std::vector<uint8_t> target(b_.size()); | |
boost::asio::buffer_copy(boost::asio::buffer(target), b_.data()); | |
auto frame = Frame::parse(target); | |
if (!frame) break; | |
f_ = std::move(frame.value()); | |
b_.consume(f_.frame_length()); | |
handler_(ec); | |
} | |
} | |
} | |
} | |
}; | |
} | |
template<class AsyncReadStream, | |
class DynamicBuffer, | |
class Frame, | |
class ReadHandler> | |
BOOST_ASIO_INITFN_RESULT_TYPE( | |
ReadHandler, void(boost::system::error_code)) | |
async_read(AsyncReadStream& s, DynamicBuffer &b, Frame &f, ReadHandler&& handler) | |
{ | |
// If you get an error on the following line it means that your handler does | |
// not meet the documented type requirements for a ReadHandler. | |
BOOST_ASIO_READ_HANDLER_CHECK(ReadHandler, handler) type_check; | |
boost::asio::async_completion<ReadHandler, | |
void(boost::system::error_code)> init(handler); | |
detail::read_frame_op<AsyncReadStream, DynamicBuffer, Frame, ReadHandler>{s, b, f, | |
std::move(init.completion_handler)}({}, 0); | |
return init.result.get(); | |
} | |
template <class Frame, class Derived> | |
class session | |
: public boost::asio::coroutine | |
, public std::enable_shared_from_this<session<Frame, Derived>> | |
{ | |
boost::asio::ip::tcp::socket socket_; | |
boost::asio::io_context::strand strand_; | |
boost::asio::streambuf buffer_; | |
Frame frame; | |
public: | |
// Take ownership of the socket | |
explicit session( | |
boost::asio::ip::tcp::socket socket) | |
: socket_(std::move(socket)) | |
, strand_(socket_.get_io_context()) | |
{ | |
} | |
// Start the asynchronous operation | |
void run() | |
{ | |
async_read_frame({}); | |
} | |
void async_read_frame(boost::system::error_code ec) | |
{ | |
BOOST_ASIO_CORO_REENTER(*this) { | |
while (true) { | |
BOOST_ASIO_CORO_YIELD async_read(socket_, buffer_, frame, | |
boost::asio::bind_executor( | |
strand_, | |
std::bind( | |
&session::async_read_frame, | |
std::enable_shared_from_this<session<Frame, Derived>>::shared_from_this(), | |
std::placeholders::_1))); | |
if (ec) { | |
break; | |
} | |
static_cast<Derived *>(this)->processFrame(frame); | |
} | |
} | |
} | |
}; | |
template <typename Frame> | |
class session1 : public session<Frame, session1<Frame>> { | |
public: | |
using session<Frame, session1<Frame>>::session; | |
void processFrame(const Frame& frame) | |
{ | |
printf("Consume %lu bytes\n", frame.frame_length()); | |
printf("%s\n", frame.body.c_str()); | |
} | |
}; | |
class listener | |
: public boost::asio::coroutine | |
, public std::enable_shared_from_this<listener> | |
{ | |
boost::asio::ip::tcp::acceptor acceptor_; | |
boost::asio::ip::tcp::socket socket_; | |
public: | |
listener( | |
boost::asio::io_context& ioc, | |
boost::asio::ip::tcp::endpoint endpoint) | |
: acceptor_(ioc) | |
, socket_(ioc) | |
{ | |
boost::system::error_code ec; | |
// Open the acceptor | |
acceptor_.open(endpoint.protocol(), ec); | |
if (ec) | |
{ | |
return; | |
} | |
// Allow address reuse | |
acceptor_.set_option(boost::asio::socket_base::reuse_address(true), ec); | |
if (ec) | |
{ | |
return; | |
} | |
// Bind to the server address | |
acceptor_.bind(endpoint, ec); | |
if (ec) | |
{ | |
return; | |
} | |
// Start listening for connections | |
acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec); | |
if (ec) | |
{ | |
return; | |
} | |
} | |
// Start accepting incoming connections | |
void | |
run() | |
{ | |
if (!acceptor_.is_open()) | |
return; | |
loop(); | |
} | |
void | |
loop(boost::system::error_code ec = {}) | |
{ | |
BOOST_ASIO_CORO_REENTER(*this) | |
{ | |
while (true) | |
{ | |
BOOST_ASIO_CORO_YIELD acceptor_.async_accept( | |
socket_, | |
std::bind( | |
&listener::loop, | |
shared_from_this(), | |
std::placeholders::_1)); | |
if (ec) | |
{ | |
} | |
else | |
{ | |
// Create the session and run it | |
std::make_shared<session1<Frame1>>( | |
std::move(socket_))->run(); | |
} | |
} | |
} | |
} | |
}; | |
int main() | |
{ | |
boost::asio::io_context ctx; | |
auto const address = boost::asio::ip::make_address("0.0.0.0"); | |
auto const port = static_cast<unsigned short>(std::atoi("8080")); | |
// Create and launch a listening port | |
std::make_shared<listener>( | |
ctx, | |
boost::asio::ip::tcp::endpoint{ address, port })->run(); | |
ctx.run(); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment