Skip to content

Instantly share code, notes, and snippets.

@HungMingWu
Last active November 2, 2018 09:57
Show Gist options
  • Save HungMingWu/85557b93ba42ca0a6d8b15c311b7fa65 to your computer and use it in GitHub Desktop.
Save HungMingWu/85557b93ba42ca0a6d8b15c311b7fa65 to your computer and use it in GitHub Desktop.
Uncomplet C++ ASIO Example
#include <iostream>
#include <sstream>
#include <functional>
#include <boost/optional.hpp>
#include <boost/asio/ts/internet.hpp>
#include <boost/asio/ts/buffer.hpp>
#include <boost/asio.hpp>
// #define NETWORK_TS_ENABLED
#define BOOST_BEAST_HANDLER_INIT(type, sig) \
boost::asio::async_completion<type, sig> init{handler}
namespace detail {
template<class T, class = void>
struct has_read_size_helper : std::false_type {};
template<class T>
struct has_read_size_helper<T, decltype(
read_size_helper(std::declval<T&>(), 512),
(void)0)> : std::true_type
{
};
template<class DynamicBuffer>
std::size_t
read_size(DynamicBuffer& buffer,
std::size_t max_size, std::true_type)
{
return read_size_helper(buffer, max_size);
}
template<class DynamicBuffer>
std::size_t
read_size(DynamicBuffer& buffer,
std::size_t max_size, std::false_type)
{
static_assert(
boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
"DynamicBuffer requirements not met");
BOOST_ASSERT(max_size >= 1);
auto const size = buffer.size();
auto const limit = buffer.max_size() - size;
BOOST_ASSERT(size <= buffer.max_size());
return (std::min<std::size_t>)(
(std::max<std::size_t>)(512, buffer.capacity() - size),
(std::min<std::size_t>)(max_size, limit));
}
}
template<class DynamicBuffer>
inline
std::size_t
read_size(
DynamicBuffer& buffer, std::size_t max_size)
{
return detail::read_size(buffer, max_size,
detail::has_read_size_helper<DynamicBuffer>{});
}
template<class DynamicBuffer>
std::size_t
read_size_or_throw(
DynamicBuffer& buffer, std::size_t max_size)
{
auto const n = read_size(buffer, max_size);
if (n == 0)
BOOST_THROW_EXCEPTION(std::length_error{
"buffer overflow" });
return n;
}
template<class Stream, class DynamicBuffer, class Handler>
class read_some_op : public boost::asio::coroutine
{
Stream& s_;
boost::asio::executor_work_guard<decltype(
std::declval<Stream&>().get_executor())> wg_;
DynamicBuffer& b_;
std::size_t bytes_transferred_ = 0;
Handler h_;
bool cont_ = false;
public:
read_some_op(read_some_op&&) = default;
read_some_op(read_some_op const&) = delete;
template<class DeducedHandler>
read_some_op(DeducedHandler&& h, Stream& s,
DynamicBuffer& b)
: s_(s)
, wg_(s_.get_executor())
, b_(b)
, h_(std::forward<DeducedHandler>(h))
{
}
using allocator_type =
boost::asio::associated_allocator_t<Handler>;
allocator_type
get_allocator() const noexcept
{
return (boost::asio::get_associated_allocator)(h_);
}
#if defined(NETWORK_TS_ENABLED)
using executor_type = boost::asio::associated_executor_t<
Handler, decltype(std::declval<Stream&&&>().get_executor())>;
executor_type get_executor() const noexcept
{
return (boost::asio::get_associated_executor)(
h_, s_.get_executor());
}
#else
template<class Function>
friend void asio_handler_invoke(Function&& f, read_some_op* op)
{
boost::asio::asio_handler_invoke(f, std::addressof(op->h_));
}
#endif
void
operator()(
boost::system::error_code ec,
std::size_t bytes_transferred = 0,
bool cont = true);
friend bool asio_handler_is_continuation(read_some_op* op)
{
using boost::asio::asio_handler_is_continuation;
return op->cont_ ? true :
asio_handler_is_continuation(
std::addressof(op->h_));
}
};
template <class Stream, class DynamicBuffer, class Handler>
void read_some_op<Stream, DynamicBuffer, Handler>::
operator()(
boost::system::error_code ec,
std::size_t bytes_transferred,
bool cont)
{
cont_ = cont;
boost::optional<typename DynamicBuffer::mutable_buffers_type> mb;
BOOST_ASIO_CORO_REENTER(*this)
{
if (b_.size() == 0)
goto do_read;
for (;;)
{
// parse
{
auto const used = 0;// p_.put(b_.data(), ec);
bytes_transferred_ += used;
b_.consume(used);
}
/*
if (ec != http::error::need_more)
break;
*/
do_read:
try
{
mb.emplace(b_.prepare(
read_size_or_throw(b_, 65536)));
}
catch (std::length_error const&)
{
//ec = error::buffer_overflow;
break;
}
BOOST_ASIO_CORO_YIELD
s_.async_read_some(*mb, std::move(*this));
if (ec == boost::asio::error::eof)
{
BOOST_ASSERT(bytes_transferred == 0);
if (true) //p_.got_some())
{
// caller sees EOF on next read
ec.assign(0, ec.category());
//p_.put_eof(ec);
if (ec)
goto upcall;
//BOOST_ASSERT(p_.is_done());
goto upcall;
//BOOST_ASSERT(p_.is_done());
goto upcall;
}
//ec = error::end_of_stream;
break;
}
if (ec)
break;
b_.commit(bytes_transferred);
}
upcall:
if (!cont_)
{
BOOST_ASIO_CORO_YIELD
boost::asio::post(
s_.get_executor(),
bind_handler(std::move(*this),
ec, bytes_transferred_));
}
h_(ec, bytes_transferred_);
}
}
template<
class AsyncReadStream,
class DynamicBuffer,
class ReadHandler>
BOOST_ASIO_INITFN_RESULT_TYPE(
ReadHandler, void(boost::system::error_code, std::string))
async_read_some(
AsyncReadStream& stream,
DynamicBuffer& buffer,
ReadHandler&& handler)
{
BOOST_BEAST_HANDLER_INIT(
ReadHandler, void(boost::system::error_code, std::string));
read_some_op<AsyncReadStream,
DynamicBuffer, BOOST_ASIO_HANDLER_TYPE(
ReadHandler, void(boost::system::error_code, std::string))>{
std::move(init.completion_handler), stream, buffer}(
{}, 0, false);
return init.result.get();
}
template<class Stream, class DynamicBuffer,
class Handler>
class read_op
: public boost::asio::coroutine
{
Stream& s_;
boost::asio::executor_work_guard<decltype(
std::declval<Stream&>().get_executor())> wg_;
DynamicBuffer& b_;
std::size_t bytes_transferred_ = 0;
Handler h_;
bool cont_ = false;
public:
read_op(read_op&&) = default;
read_op(read_op const&) = delete;
template<class DeducedHandler>
read_op(DeducedHandler&& h, Stream& s,
DynamicBuffer& b)
: s_(s)
, wg_(s_.get_executor())
, b_(b)
, h_(std::forward<DeducedHandler>(h))
{
}
using allocator_type =
boost::asio::associated_allocator_t<Handler>;
allocator_type
get_allocator() const noexcept
{
return (boost::asio::get_associated_allocator)(h_);
}
#if defined(NETWORK_TS_ENABLED)
using executor_type = boost::asio::associated_executor_t<
Handler, decltype(std::declval<Stream&>().get_executor())>;
executor_type get_executor() const noexcept
{
return boost::asio::get_associated_executor(
h_, s_.get_executor());
}
#else
template<class Function>
friend void asio_handler_invoke(Function&& f, read_op* op)
{
boost::asio::asio_handler_invoke(f, std::addressof(op->h_));
}
#endif
void
operator()(
boost::system::error_code ec,
std::size_t bytes_transferred = 0,
bool cont = true);
friend
bool asio_handler_is_continuation(read_op* op)
{
using boost::asio::asio_handler_is_continuation;
return op->cont_ ? true :
asio_handler_is_continuation(
std::addressof(op->h_));
}
};
struct Parser {
static bool Parse() {
static int count = 0;
count++;
if (count == 5) return true;
return false;
}
};
template<class Stream, class DynamicBuffer, class Handler>
void read_op<Stream, DynamicBuffer, Handler>::
operator()(
boost::system::error_code ec,
std::size_t bytes_transferred,
bool cont)
{
cont_ = cont;
BOOST_ASIO_CORO_REENTER(*this)
{
if (Parser::Parse())
{
BOOST_ASIO_CORO_YIELD
boost::asio::post(s_.get_executor(),
bind_handler(std::move(*this), ec));
goto upcall;
}
for (;;)
{
BOOST_ASIO_CORO_YIELD
async_read_some(
s_, b_, std::move(*this));
if (ec)
goto upcall;
bytes_transferred_ += bytes_transferred;
if (true)
goto upcall;
}
upcall:
h_(ec, "hello");
}
}
template<
class AsyncReadStream,
class DynamicBuffer,
class ReadHandler>
BOOST_ASIO_INITFN_RESULT_TYPE(
ReadHandler, void(boost::system::error_code, std::string))
async_read1(
AsyncReadStream& stream,
DynamicBuffer& buffer,
ReadHandler&& handler)
{
BOOST_BEAST_HANDLER_INIT(
ReadHandler, void(boost::system::error_code, std::string));
read_op<AsyncReadStream, DynamicBuffer,
BOOST_ASIO_HANDLER_TYPE(ReadHandler, void(boost::system::error_code, std::string))>{
std::move(init.completion_handler), stream, buffer}(
{}, 0, false);
return init.result.get();
}
int main()
{
boost::asio::io_context ctx;
boost::asio::ip::tcp::socket s(ctx);
boost::asio::streambuf buf;
std::function<void(boost::system::error_code, std::string)> func = [](boost::system::error_code, std::string value) {
std::cout << value << "\n";
};
async_read1(s, buf, func);
ctx.run();
}
#include <iostream>
#include <chrono>
#include <thread>
#include <boost/asio.hpp>
struct my_handler
{
void operator()() const
{
std::cout << "my_handler: " << std::this_thread::get_id() << std::endl;
}
friend auto asio_handler_is_continuation(my_handler* h)
-> bool
{
return h->is_continuation;
}
bool is_continuation;
};
int main(int argc, char* argv[])
{
boost::asio::io_service io_service{};
io_service.post([&]
{
// handler を登録して 1 秒間 sleep
io_service.post(my_handler{argc != 1});
std::this_thread::sleep_for(std::chrono::seconds{1});
std::cout << "handler1: " << std::this_thread::get_id() << std::endl;
});
auto t1 = std::thread{[&]{
io_service.run();
}};
auto t2 = std::thread{[&]{
io_service.run();
}};
t2.join();
t1.join();
}
#include <iostream>
#include <chrono>
#include <thread>
#define BOOST_ASIO_NO_DEPRECATED
#include <boost/asio/ts/io_context.hpp>
#include <boost/asio/ts/executor.hpp>
struct my_handler
{
void operator()() const
{
std::cout << "my_handler: " << std::this_thread::get_id() << std::endl;
}
};
int main(int argc, char* argv[])
{
boost::asio::io_context io_context{};
boost::asio::post(io_context, [&] {
if (argc != 1)
boost::asio::defer(io_context, my_handler{});
else
boost::asio::post(io_context, my_handler{});
std::this_thread::sleep_for(std::chrono::seconds{ 1 });
std::cout << "handler1: " << std::this_thread::get_id() << std::endl;
});
auto t1 = std::thread{ [&] {
io_context.run();
} };
auto t2 = std::thread{ [&] {
io_context.run();
} };
t2.join();
t1.join();
return 0;
}
//
// server.cpp
// ~~~~~~~~~~
//
// Copyright (c) 2003-2015 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <array>
#include <cstdlib>
#include <iostream>
#include <memory>
#include <type_traits>
#include <utility>
#define BOOST_ASIO_NO_DEPRECATED
#include <boost/asio.hpp>
using boost::asio::ip::tcp;
// Wrapper class template for handler objects to allow handler memory
// allocation to be customised. Calls to operator() are forwarded to the
// encapsulated handler.
template <typename Handler>
class custom_alloc_handler
{
public:
custom_alloc_handler(Handler h) : handler_(h) {}
template <typename ...Args>
void operator()(Args&&... args)
{
handler_(std::forward<Args>(args)...);
}
std::allocator<void> alloc;
using allocator_type =
boost::asio::associated_allocator_t<Handler>;
allocator_type
get_allocator() const noexcept
{
return boost::asio::get_associated_allocator(handler_, alloc);
}
#if 0
friend void* asio_handler_allocate(std::size_t size,
custom_alloc_handler<Handler>* this_handler)
{
return malloc(size);
}
friend void asio_handler_deallocate(void* pointer, std::size_t /*size*/,
custom_alloc_handler<Handler>* this_handler)
{
free(pointer);
}
#endif
private:
Handler handler_;
};
// Helper function to wrap a handler object to add custom allocation.
template <typename Handler>
inline custom_alloc_handler<Handler> make_custom_alloc_handler(Handler h)
{
return custom_alloc_handler<Handler>(h);
}
class session
: public std::enable_shared_from_this<session>
{
public:
session(tcp::socket socket)
: socket_(std::move(socket))
{
}
void start()
{
do_read();
}
private:
void do_read()
{
auto self(shared_from_this());
socket_.async_read_some(boost::asio::buffer(data_),
make_custom_alloc_handler(
[this, self](boost::system::error_code ec, std::size_t length)
{
if (!ec)
{
do_write(length);
}
}));
}
void do_write(std::size_t length)
{
auto self(shared_from_this());
boost::asio::async_write(socket_, boost::asio::buffer(data_, length),
make_custom_alloc_handler(
[this, self](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
do_read();
}
}));
}
// The socket used to communicate with the client.
tcp::socket socket_;
// Buffer used to store data received from the client.
std::array<char, 1024> data_;
};
class server
{
public:
server(boost::asio::io_context& io_service, short port)
: acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
socket_(io_service)
{
do_accept();
}
private:
void do_accept()
{
acceptor_.async_accept(socket_,
[this](boost::system::error_code ec)
{
if (!ec)
{
std::make_shared<session>(std::move(socket_))->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
tcp::socket socket_;
};
int main(int argc, char* argv[])
{
try
{
boost::asio::io_context io_service;
server s(io_service, 12345);
io_service.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment