Created
April 12, 2018 17:10
-
-
Save th0rex/6e531093b7e7ac690732183201e714c2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include <exception> | |
#include <variant> | |
#include <experimental/coroutine> | |
#include <experimental/net> | |
namespace ams::async { | |
using std::experimental::coroutine_handle; | |
template <typename Object, typename Function, typename Result> | |
struct Awaiter { | |
Object& _object; | |
Function _func; | |
std::variant<std::monostate, Result, std::error_code> _result; | |
Awaiter(Object& object, Function&& func) | |
: _object{object}, _func{std::forward<Function>(func)}, _result{} {} | |
bool await_ready() { return false; } | |
Result await_resume() { | |
if (_result.index() == 2) throw std::system_error(std::get<2>(_result)); | |
return std::get<1>(std::move(_result)); | |
} | |
void await_suspend(coroutine_handle<> coro) { | |
_func(_object, [this, coro](std::error_code const& ec, Result r) mutable { | |
if (!ec) { | |
_result.template emplace<1>(std::move(r)); | |
} else { | |
_result.template emplace<2>(ec); | |
} | |
coro.resume(); | |
}); | |
} | |
}; | |
template <typename Object, typename Function> | |
struct Awaiter<Object, Function, void> { | |
Object& _object; | |
Function _func; | |
std::variant<std::monostate, std::error_code> _result; | |
Awaiter(Object& object, Function&& func) | |
: _object{object}, _func{std::forward<Function>(func)}, _result{} {} | |
bool await_ready() { return false; } | |
void await_resume() { | |
if (_result.index() == 1) throw std::system_error(std::get<1>(_result)); | |
} | |
void await_suspend(coroutine_handle<> coro) { | |
_func(_object, [this, coro](std::error_code const& ec) mutable { | |
if (ec) { | |
this->_result.template emplace<1>(ec); | |
} | |
coro.resume(); | |
}); | |
} | |
}; | |
template <typename Result, typename Object, typename Function> | |
auto make_awaiter(Object& object, Function&& function) { | |
return Awaiter<Object, Function, Result>{object, | |
std::forward<Function>(function)}; | |
} | |
template <typename Acceptor> | |
auto async_accept(Acceptor& a) { | |
return make_awaiter<typename Acceptor::protocol_type::socket>( | |
a, [](Acceptor& acceptor, auto&& callback) { | |
acceptor.async_accept(std::forward<decltype(callback)>(callback)); | |
}); | |
} | |
template <typename Socket, typename Endpoint> | |
auto async_connect(Socket& socket, Endpoint ep) { | |
return make_awaiter<void>( | |
socket, [ep = std::move(ep)](Socket & socket, auto&& callback) { | |
socket.async_connect(ep, std::forward<decltype(callback)>(callback)); | |
}); | |
} | |
template <typename Socket, typename MutableBuffer> | |
auto async_read(Socket& socket, MutableBuffer buffer) { | |
return make_awaiter<const unsigned long>( | |
socket, [buffer = std::move(buffer)](Socket & socket, auto&& callback) { | |
std::experimental::net::async_read( | |
socket, buffer, std::forward<decltype(callback)>(callback)); | |
}); | |
} | |
template <typename Socket, typename ConstantBuffer> | |
auto async_write(Socket& socket, ConstantBuffer buffer) { | |
return make_awaiter<const unsigned long>( | |
socket, [buffer = std::move(buffer)](Socket & socket, auto&& callback) { | |
std::experimental::net::async_write( | |
socket, buffer, std::forward<decltype(callback)>(callback)); | |
}); | |
} | |
} // namespace ams::async |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include <array> | |
#include <memory> | |
#include <sstream> | |
#include <string> | |
#include <cereal/archives/binary.hpp> | |
#include <async/async.hpp> | |
namespace ams::communication { | |
template <typename T> | |
T deserialize(std::unique_ptr<char[]> data) { | |
std::stringstream ss; | |
ss << data.get(); | |
cereal::BinaryInputArchive ar{ss}; | |
T message; | |
ar(message); | |
return message; | |
} | |
template <typename T> | |
std::string serialize(T const& data) { | |
std::stringstream ss; | |
cereal::BinaryOutputArchive ar{ss}; | |
ar(data); | |
return ss.str(); | |
}; | |
template <typename Object, typename Socket> | |
async::task<Object> async_read_deserialize(Socket& socket) { | |
std::array<char, sizeof(std::uint64_t)> length_buffer; | |
(void)co_await async::async_read( | |
socket, std::experimental::net::buffer(length_buffer)); | |
const auto length = *reinterpret_cast<std::uint64_t*>(length_buffer.data()); | |
auto data = std::make_unique<char[]>(length); | |
(void)co_await async::async_read( | |
socket, std::experimental::net::buffer(data.get(), length)); | |
co_return deserialize<Object>(std::move(data)); | |
} | |
template <typename Socket, typename Object> | |
async::task<std::size_t> async_write_serialize(Socket& socket, | |
Object const& object) { | |
const auto serialized_data = serialize(object); | |
const auto length = static_cast<std::uint64_t>(serialized_data.size()); | |
const auto* length_buffer = reinterpret_cast<const char*>(&length); | |
auto total_written = co_await async::async_write( | |
socket, std::experimental::net::buffer(length_buffer, sizeof(length))); | |
total_written += co_await async::async_write( | |
socket, std::experimental::net::buffer(serialized_data)); | |
co_return total_written; | |
} | |
} // namespace ams::communication |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include <exception> | |
#include <optional> | |
#include <variant> | |
#include <experimental/coroutine> | |
namespace ams::async { | |
using std::experimental::coroutine_handle; | |
template <typename Promise> | |
struct TaskAwaiter { | |
Promise* me; | |
bool await_ready() { return false; } | |
void await_suspend(coroutine_handle<>) { | |
if (me->waiter) { | |
me->waiter->resume(); | |
} | |
} | |
void await_resume() {} | |
}; | |
template <typename Promise> | |
TaskAwaiter(Promise*)->TaskAwaiter<Promise>; | |
template <typename T> | |
struct task { | |
struct promise_type { | |
std::variant<std::monostate, T, std::exception_ptr> result; | |
std::optional<coroutine_handle<>> waiter; | |
task get_return_object() { return task{*this}; } | |
std::experimental::suspend_always initial_suspend() { return {}; } | |
auto final_suspend() { return TaskAwaiter{this}; } | |
template <typename U> | |
void return_value(U&& value) { | |
result.template emplace<1>(std::forward<U>(value)); | |
} | |
void set_exception(std::exception_ptr ptr) { | |
result.template emplace<2>(std::move(ptr)); | |
} | |
void unhandled_exception() { std::terminate(); } | |
}; | |
bool await_ready() { return false; } | |
void await_suspend(coroutine_handle<> caller_coro) { | |
coro.promise().waiter = caller_coro; | |
coro.resume(); | |
} | |
T await_resume() { | |
if (coro.promise().result.index() == 2) | |
std::rethrow_exception(std::get<2>(coro.promise().result)); | |
return std::get<1>(coro.promise().result); | |
} | |
void get() { | |
coro.promise().waiter.reset(); | |
coro.resume(); | |
} | |
private: | |
task(promise_type& p) | |
: coro(coroutine_handle<promise_type>::from_promise(p)) {} | |
coroutine_handle<promise_type> coro; | |
}; | |
template <> | |
struct task<void> { | |
struct promise_type { | |
std::optional<std::exception_ptr> result; | |
std::optional<coroutine_handle<>> waiter; | |
task get_return_object() { return task{*this}; } | |
std::experimental::suspend_always initial_suspend() { return {}; } | |
auto final_suspend() { return TaskAwaiter{this}; } | |
void return_void() {} | |
void set_exception(std::exception_ptr ptr) { result = std::move(ptr); } | |
void unhandled_exception() { std::terminate(); } | |
}; | |
bool await_ready() { return false; } | |
void await_suspend(coroutine_handle<> caller) { | |
coro.promise().waiter = caller; | |
coro.resume(); | |
} | |
void await_resume() { | |
if (coro.promise().result) { | |
std::rethrow_exception(*coro.promise().result); | |
} | |
} | |
void get() { | |
coro.promise().waiter.reset(); | |
coro.resume(); | |
} | |
private: | |
task(promise_type& p) | |
: coro(coroutine_handle<promise_type>::from_promise(p)) {} | |
coroutine_handle<promise_type> coro; | |
}; | |
} // namespace ams::async |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment