Skip to content

Instantly share code, notes, and snippets.

@HungMingWu
Last active February 21, 2019 07:13
Show Gist options
  • Save HungMingWu/229ee0657b9ea706a32ceec8bc8f965a to your computer and use it in GitHub Desktop.
Save HungMingWu/229ee0657b9ea706a32ceec8bc8f965a to your computer and use it in GitHub Desktop.
Write my own custom asio async function
#include <boost/system/error_code.hpp>
#include <boost/asio/async_result.hpp>
#include <boost/asio/detail/handler_type_requirements.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/streambuf.hpp>
#include <boost/optional.hpp>
#include <boost/asio/io_context_strand.hpp>
#include <boost/asio/bind_executor.hpp>
struct Frame1 {
std::string body;
size_t frame_length() const {
return 4 + body.length();
}
static boost::optional<Frame1> parse(const std::vector<uint8_t> &input)
{
if (input.size() < 4)
return boost::none;
size_t frame_len = *(uint32_t *)(&input[0]);
if (input.size() < frame_len)
return boost::none;
std::string body((char *)&input[4], frame_len - 4);
return Frame1{ std::move(body) };
}
};
namespace detail {
template <class AsyncReadStream, class DynamicBuffer, class Frame, class ReadHandler>
class read_frame_op : public boost::asio::coroutine {
AsyncReadStream &s_;
DynamicBuffer &b_;
Frame &f_;
ReadHandler handler_;
public:
read_frame_op(AsyncReadStream &s, DynamicBuffer &b, Frame &f, ReadHandler&& handler)
: s_(s),
b_(b),
f_(f),
handler_(BOOST_ASIO_MOVE_CAST(ReadHandler)(handler))
{
}
void operator()(
boost::system::error_code ec,
std::size_t bytes_transferred = 0)
{
BOOST_ASIO_CORO_REENTER(*this)
{
for (;;)
{
BOOST_ASIO_CORO_YIELD
s_.async_read_some(b_.prepare(65536), std::move(*this));
if (ec) {
handler_(ec);
break;
}
b_.commit(bytes_transferred);
while (true) {
std::vector<uint8_t> target(b_.size());
boost::asio::buffer_copy(boost::asio::buffer(target), b_.data());
auto frame = Frame::parse(target);
if (!frame) break;
f_ = std::move(frame.value());
b_.consume(f_.frame_length());
handler_(ec);
}
}
}
}
};
}
template<class AsyncReadStream,
class DynamicBuffer,
class Frame,
class ReadHandler>
BOOST_ASIO_INITFN_RESULT_TYPE(
ReadHandler, void(boost::system::error_code))
async_read(AsyncReadStream& s, DynamicBuffer &b, Frame &f, ReadHandler&& handler)
{
// If you get an error on the following line it means that your handler does
// not meet the documented type requirements for a ReadHandler.
BOOST_ASIO_READ_HANDLER_CHECK(ReadHandler, handler) type_check;
boost::asio::async_completion<ReadHandler,
void(boost::system::error_code)> init(handler);
detail::read_frame_op<AsyncReadStream, DynamicBuffer, Frame, ReadHandler>{s, b, f,
std::move(init.completion_handler)}({}, 0);
return init.result.get();
}
template <class Frame, class Derived>
class session
: public boost::asio::coroutine
, public std::enable_shared_from_this<session<Frame, Derived>>
{
boost::asio::ip::tcp::socket socket_;
boost::asio::io_context::strand strand_;
boost::asio::streambuf buffer_;
Frame frame;
public:
// Take ownership of the socket
explicit session(
boost::asio::ip::tcp::socket socket)
: socket_(std::move(socket))
, strand_(socket_.get_io_context())
{
}
// Start the asynchronous operation
void run()
{
async_read_frame({});
}
void async_read_frame(boost::system::error_code ec)
{
BOOST_ASIO_CORO_REENTER(*this) {
while (true) {
BOOST_ASIO_CORO_YIELD async_read(socket_, buffer_, frame,
boost::asio::bind_executor(
strand_,
std::bind(
&session::async_read_frame,
std::enable_shared_from_this<session<Frame, Derived>>::shared_from_this(),
std::placeholders::_1)));
if (ec) {
break;
}
static_cast<Derived *>(this)->processFrame(frame);
}
}
}
};
template <typename Frame>
class session1 : public session<Frame, session1<Frame>> {
public:
using session<Frame, session1<Frame>>::session;
void processFrame(const Frame& frame)
{
printf("Consume %lu bytes\n", frame.frame_length());
printf("%s\n", frame.body.c_str());
}
};
class listener
: public boost::asio::coroutine
, public std::enable_shared_from_this<listener>
{
boost::asio::ip::tcp::acceptor acceptor_;
boost::asio::ip::tcp::socket socket_;
public:
listener(
boost::asio::io_context& ioc,
boost::asio::ip::tcp::endpoint endpoint)
: acceptor_(ioc)
, socket_(ioc)
{
boost::system::error_code ec;
// Open the acceptor
acceptor_.open(endpoint.protocol(), ec);
if (ec)
{
return;
}
// Allow address reuse
acceptor_.set_option(boost::asio::socket_base::reuse_address(true), ec);
if (ec)
{
return;
}
// Bind to the server address
acceptor_.bind(endpoint, ec);
if (ec)
{
return;
}
// Start listening for connections
acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec);
if (ec)
{
return;
}
}
// Start accepting incoming connections
void
run()
{
if (!acceptor_.is_open())
return;
loop();
}
void
loop(boost::system::error_code ec = {})
{
BOOST_ASIO_CORO_REENTER(*this)
{
while (true)
{
BOOST_ASIO_CORO_YIELD acceptor_.async_accept(
socket_,
std::bind(
&listener::loop,
shared_from_this(),
std::placeholders::_1));
if (ec)
{
}
else
{
// Create the session and run it
std::make_shared<session1<Frame1>>(
std::move(socket_))->run();
}
}
}
}
};
int main()
{
boost::asio::io_context ctx;
auto const address = boost::asio::ip::make_address("0.0.0.0");
auto const port = static_cast<unsigned short>(std::atoi("8080"));
// Create and launch a listening port
std::make_shared<listener>(
ctx,
boost::asio::ip::tcp::endpoint{ address, port })->run();
ctx.run();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment