Skip to content

Instantly share code, notes, and snippets.

@th0rex
Created April 12, 2018 17:10
Show Gist options
  • Save th0rex/6e531093b7e7ac690732183201e714c2 to your computer and use it in GitHub Desktop.
Save th0rex/6e531093b7e7ac690732183201e714c2 to your computer and use it in GitHub Desktop.
#pragma once
#include <exception>
#include <variant>
#include <experimental/coroutine>
#include <experimental/net>
namespace ams::async {
using std::experimental::coroutine_handle;
template <typename Object, typename Function, typename Result>
struct Awaiter {
Object& _object;
Function _func;
std::variant<std::monostate, Result, std::error_code> _result;
Awaiter(Object& object, Function&& func)
: _object{object}, _func{std::forward<Function>(func)}, _result{} {}
bool await_ready() { return false; }
Result await_resume() {
if (_result.index() == 2) throw std::system_error(std::get<2>(_result));
return std::get<1>(std::move(_result));
}
void await_suspend(coroutine_handle<> coro) {
_func(_object, [this, coro](std::error_code const& ec, Result r) mutable {
if (!ec) {
_result.template emplace<1>(std::move(r));
} else {
_result.template emplace<2>(ec);
}
coro.resume();
});
}
};
template <typename Object, typename Function>
struct Awaiter<Object, Function, void> {
Object& _object;
Function _func;
std::variant<std::monostate, std::error_code> _result;
Awaiter(Object& object, Function&& func)
: _object{object}, _func{std::forward<Function>(func)}, _result{} {}
bool await_ready() { return false; }
void await_resume() {
if (_result.index() == 1) throw std::system_error(std::get<1>(_result));
}
void await_suspend(coroutine_handle<> coro) {
_func(_object, [this, coro](std::error_code const& ec) mutable {
if (ec) {
this->_result.template emplace<1>(ec);
}
coro.resume();
});
}
};
template <typename Result, typename Object, typename Function>
auto make_awaiter(Object& object, Function&& function) {
return Awaiter<Object, Function, Result>{object,
std::forward<Function>(function)};
}
template <typename Acceptor>
auto async_accept(Acceptor& a) {
return make_awaiter<typename Acceptor::protocol_type::socket>(
a, [](Acceptor& acceptor, auto&& callback) {
acceptor.async_accept(std::forward<decltype(callback)>(callback));
});
}
template <typename Socket, typename Endpoint>
auto async_connect(Socket& socket, Endpoint ep) {
return make_awaiter<void>(
socket, [ep = std::move(ep)](Socket & socket, auto&& callback) {
socket.async_connect(ep, std::forward<decltype(callback)>(callback));
});
}
template <typename Socket, typename MutableBuffer>
auto async_read(Socket& socket, MutableBuffer buffer) {
return make_awaiter<const unsigned long>(
socket, [buffer = std::move(buffer)](Socket & socket, auto&& callback) {
std::experimental::net::async_read(
socket, buffer, std::forward<decltype(callback)>(callback));
});
}
template <typename Socket, typename ConstantBuffer>
auto async_write(Socket& socket, ConstantBuffer buffer) {
return make_awaiter<const unsigned long>(
socket, [buffer = std::move(buffer)](Socket & socket, auto&& callback) {
std::experimental::net::async_write(
socket, buffer, std::forward<decltype(callback)>(callback));
});
}
} // namespace ams::async
#pragma once
#include <array>
#include <memory>
#include <sstream>
#include <string>
#include <cereal/archives/binary.hpp>
#include <async/async.hpp>
namespace ams::communication {
template <typename T>
T deserialize(std::unique_ptr<char[]> data) {
std::stringstream ss;
ss << data.get();
cereal::BinaryInputArchive ar{ss};
T message;
ar(message);
return message;
}
template <typename T>
std::string serialize(T const& data) {
std::stringstream ss;
cereal::BinaryOutputArchive ar{ss};
ar(data);
return ss.str();
};
template <typename Object, typename Socket>
async::task<Object> async_read_deserialize(Socket& socket) {
std::array<char, sizeof(std::uint64_t)> length_buffer;
(void)co_await async::async_read(
socket, std::experimental::net::buffer(length_buffer));
const auto length = *reinterpret_cast<std::uint64_t*>(length_buffer.data());
auto data = std::make_unique<char[]>(length);
(void)co_await async::async_read(
socket, std::experimental::net::buffer(data.get(), length));
co_return deserialize<Object>(std::move(data));
}
template <typename Socket, typename Object>
async::task<std::size_t> async_write_serialize(Socket& socket,
Object const& object) {
const auto serialized_data = serialize(object);
const auto length = static_cast<std::uint64_t>(serialized_data.size());
const auto* length_buffer = reinterpret_cast<const char*>(&length);
auto total_written = co_await async::async_write(
socket, std::experimental::net::buffer(length_buffer, sizeof(length)));
total_written += co_await async::async_write(
socket, std::experimental::net::buffer(serialized_data));
co_return total_written;
}
} // namespace ams::communication
#pragma once
#include <exception>
#include <optional>
#include <variant>
#include <experimental/coroutine>
namespace ams::async {
using std::experimental::coroutine_handle;
template <typename Promise>
struct TaskAwaiter {
Promise* me;
bool await_ready() { return false; }
void await_suspend(coroutine_handle<>) {
if (me->waiter) {
me->waiter->resume();
}
}
void await_resume() {}
};
template <typename Promise>
TaskAwaiter(Promise*)->TaskAwaiter<Promise>;
template <typename T>
struct task {
struct promise_type {
std::variant<std::monostate, T, std::exception_ptr> result;
std::optional<coroutine_handle<>> waiter;
task get_return_object() { return task{*this}; }
std::experimental::suspend_always initial_suspend() { return {}; }
auto final_suspend() { return TaskAwaiter{this}; }
template <typename U>
void return_value(U&& value) {
result.template emplace<1>(std::forward<U>(value));
}
void set_exception(std::exception_ptr ptr) {
result.template emplace<2>(std::move(ptr));
}
void unhandled_exception() { std::terminate(); }
};
bool await_ready() { return false; }
void await_suspend(coroutine_handle<> caller_coro) {
coro.promise().waiter = caller_coro;
coro.resume();
}
T await_resume() {
if (coro.promise().result.index() == 2)
std::rethrow_exception(std::get<2>(coro.promise().result));
return std::get<1>(coro.promise().result);
}
void get() {
coro.promise().waiter.reset();
coro.resume();
}
private:
task(promise_type& p)
: coro(coroutine_handle<promise_type>::from_promise(p)) {}
coroutine_handle<promise_type> coro;
};
template <>
struct task<void> {
struct promise_type {
std::optional<std::exception_ptr> result;
std::optional<coroutine_handle<>> waiter;
task get_return_object() { return task{*this}; }
std::experimental::suspend_always initial_suspend() { return {}; }
auto final_suspend() { return TaskAwaiter{this}; }
void return_void() {}
void set_exception(std::exception_ptr ptr) { result = std::move(ptr); }
void unhandled_exception() { std::terminate(); }
};
bool await_ready() { return false; }
void await_suspend(coroutine_handle<> caller) {
coro.promise().waiter = caller;
coro.resume();
}
void await_resume() {
if (coro.promise().result) {
std::rethrow_exception(*coro.promise().result);
}
}
void get() {
coro.promise().waiter.reset();
coro.resume();
}
private:
task(promise_type& p)
: coro(coroutine_handle<promise_type>::from_promise(p)) {}
coroutine_handle<promise_type> coro;
};
} // namespace ams::async
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment