Last active
July 3, 2018 12:51
-
-
Save pyos/052ef69e20d22521dc5275fae237d00d to your computer and use it in GitHub Desktop.
libcno and coroutine-based echo server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include "cone.h" | |
#include <stddef.h> | |
#include <stdint.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <memory> | |
#include <vector> | |
#include <unordered_map> | |
#include <cno/core.h> | |
static constexpr const int mun_errno_cno = mun_errno_custom + 19000; | |
static constexpr const int CNO_ERRNO_MUN = mun_errno_cno; | |
namespace cno { | |
struct buffer { | |
buffer(size_t limit = 8192) : limit(limit) {} | |
mun::status read(std::string &out) { | |
for (std::string *p; !read(p);) | |
return false; | |
out = std::move(data); | |
return consume(data.size()); | |
} | |
mun::status read(std::string *&out) { | |
while (data.empty() && !eof) | |
if (!more.wait().raise(HERE)) | |
return false; | |
out = &data; | |
return true; | |
} | |
mun::status consume(size_t n) { | |
data.erase(0, n); | |
return less.wake().raise(HERE); | |
} | |
mun::status write(mun::stringview v) { | |
for (size_t offset = 0, chunk = 0; offset < v.size; offset += chunk) { | |
data.append(v.base + offset, chunk = std::min(v.size - offset, limit - data.size())); | |
if (!more.wake().raise(HERE) || (chunk < v.size - offset && !less.wait().raise(HERE))) | |
return false; | |
} | |
return true; | |
} | |
mun::status close() { | |
eof = true; | |
return more.wake().raise(HERE); | |
} | |
private: | |
size_t limit; | |
bool eof = false; | |
std::string data; | |
cone::event more; | |
cone::event less; | |
}; | |
struct header : cno_header_t { | |
header(mun::stringview k, mun::stringview v) : cno_header_t{{k.base, k.size}, {v.base, v.size}, 0} {} | |
}; | |
struct message : cno::buffer { | |
int code; | |
std::string method; | |
std::string path; | |
std::unordered_multimap<std::string, std::string> headers; | |
message(const struct cno_message_t &m) { | |
code = m.code; | |
method = std::string{m.method.data, m.method.size}; | |
path = std::string{m.path.data, m.path.size}; | |
for (auto *h = m.headers; h != &m.headers[m.headers_len]; h++) | |
headers.emplace(std::string{h->name.data, h->name.size}, std::string{h->value.data, h->value.size}); | |
} | |
}; | |
static mun::status mun2cno(mun::status c = false) { | |
return c ? CNO_OK : CNO_ERROR(MUN, "see mun::error::last() for details"); | |
} | |
static mun::status cno2mun(mun::status c = false) { | |
auto &e = *cno_error(); | |
if (!c && e.code != CNO_ERRNO_MUN) { | |
c.raise({e.traceback->file, "?", (unsigned)e.traceback->line}, mun_errno_cno + e.code, "cno", "%d: %s", e.code, e.text); | |
for (auto *t = &e.traceback[1]; t != e.traceback_end; t++) | |
c.raise({t->file, "?", (unsigned)t->line}); | |
} | |
return c; | |
} | |
struct stream { | |
using handler = std::function<mun::status(message &, stream &)>; | |
stream(struct cno_connection_t &conn, uint32_t id) : conn(conn), id(id) {} | |
mun::status head(int code, const std::vector<cno::header> &hs) { | |
while (messages.front().c.wait().raise(HERE)) {} | |
if (mun::error::last().code != mun_errno_deadlock) | |
return false; | |
struct cno_message_t cm = {code, {}, {}, (struct cno_header_t *)&hs[0], hs.size()}; | |
return cno2mun(cno_write_message(&conn, id, &cm, 0)).raise(HERE); | |
} | |
mun::status push(mun::stringview method, mun::stringview path, const std::vector<header> &hs) { | |
struct cno_message_t cm = {0, {(char *) method.base, method.size}, {(char *) path.base, path.size}, (struct cno_header_t *)&hs[0], hs.size()}; | |
return cno2mun(cno_write_push(&conn, id, &cm)).raise(HERE); | |
} | |
mun::status write(mun::stringview buf) { | |
return wbuffer.append(buf.base, buf.size).size() == buf.size ? on_flow() : mun::status{true}; | |
} | |
mun::status reset(enum CNO_RST_STREAM_CODE code = CNO_RST_CANCEL) { | |
return cno2mun(cno_write_reset(&conn, id, code)).raise(HERE); | |
} | |
mun::status on_message(const struct cno_message_t &msg, cno::stream::handler &h) { | |
return messages.emplace_back(this, &h, msg), true; | |
} | |
mun::status on_data(mun::stringview data) { | |
return messages.empty() || messages.back().m.write(data); | |
} | |
mun::status on_end() { | |
return messages.empty() || messages.back().m.close(); | |
} | |
mun::status on_flow() { | |
for (ssize_t written; wbuffer.size(); wbuffer.erase(0, written)) { | |
if (!cno2mun((written = cno_write_data(&conn, id, wbuffer.data(), wbuffer.size(), 0)) >= 0).raise(HERE)) | |
return false; | |
if (written == 0) | |
return true; | |
} | |
return flushed.wake().raise(HERE); | |
} | |
private: | |
struct task { | |
cno::stream *s; | |
cno::stream::handler *h; | |
cno::message m; | |
struct cone c; | |
task(cno::stream *s, cno::stream::handler *h, const struct cno_message_t &m) : s(s), h(h), m(m), c(*this) {} | |
mun::status operator()() { | |
if (!(*h)(m, *s).raise(HERE) || (!s->wbuffer.empty() && !s->flushed.wait().raise(HERE))) | |
return mun::error::last().code == mun_errno_cno + CNO_ERRNO_DISCONNECT; | |
c.detach(); | |
cno::stream &sr = *s; | |
s->messages.erase(s->messages.begin()); // destroys `this` | |
return cno2mun(cno_write_data(&sr.conn, sr.id, "", 0, 1)).raise(HERE); // may destroy `sr` | |
} | |
}; | |
struct cno_connection_t &conn; | |
const uint32_t id; | |
cone::event flushed; | |
std::string wbuffer; | |
std::vector<task> messages; | |
}; | |
struct server : cno_connection_t { | |
static mun::status run(mun::fd fd, cno::stream::handler handler) { | |
cno::server server{handler}; | |
struct cone writer{[&server, &fd]() -> mun::status { | |
ssize_t w; | |
for (std::string *s = nullptr; server.buffer.read(s).raise(HERE); server.buffer.consume(w)) | |
if (!mun::status{(w = ::write(fd, &(*s)[0], s->size())) >= 0}.os(HERE)) | |
return errno == EPIPE; | |
return false; | |
}}; | |
if (!cno2mun(cno_connection_made(&server, CNO_HTTP1)).raise(HERE)) | |
return false; | |
char buf[8192]; | |
for (ssize_t rd; mun::status{(rd = ::read(fd, buf, sizeof(buf))) >= 0}.os(HERE);) { | |
if (rd == 0) | |
return cno2mun(cno_connection_lost(&server)).raise(HERE); | |
if (!cno2mun(cno_connection_data_received(&server, buf, (size_t)rd)).raise(HERE)) | |
return false; | |
} | |
return false; | |
} | |
private: | |
cno::buffer buffer; | |
cno::stream::handler &handler; | |
std::unordered_map<uint32_t, std::unique_ptr<stream>> streams; | |
server(cno::stream::handler &h) noexcept : handler(h) { | |
cno_connection_init(this, CNO_SERVER); | |
cb_data = this; | |
on_write = (decltype(on_write)) &_on_write; | |
on_stream_start = (decltype(on_stream_start)) &_on_stream; | |
on_stream_end = (decltype(on_stream_end)) &_on_stream_end; | |
on_message_start = (decltype(on_message_start)) &_on_message; | |
on_message_data = (decltype(on_message_data)) &_on_message_data; | |
on_message_end = (decltype(on_message_end)) &_on_message_end; | |
on_flow_increase = (decltype(on_flow_increase)) &_on_flow; | |
} | |
~server() { | |
cno_connection_reset(this); | |
} | |
static int _on_write(server *p, const char *data, size_t size) { | |
return mun2cno(p->buffer.write({data, size}).raise(HERE)); | |
} | |
static int _on_stream(server *p, uint32_t id) { | |
return p->streams[id] = std::make_unique<cno::stream>(*p, id), CNO_OK; | |
} | |
static int _on_stream_end(server *p, uint32_t id) { | |
return p->streams.erase(id), CNO_OK; | |
} | |
static int _on_message(server *p, uint32_t id, const struct cno_message_t *msg) { | |
return mun2cno(p->streams[id]->on_message(*msg, p->handler).raise(HERE)); | |
} | |
static int _on_message_data(server *p, uint32_t id, const char *data, size_t size) { | |
return mun2cno(p->streams[id]->on_data({data, size}).raise(HERE)); | |
} | |
static int _on_message_end(server *p, uint32_t id) { | |
return mun2cno(p->streams[id]->on_end().raise(HERE)); | |
} | |
static int _on_flow(server *p, uint32_t id) { | |
if (id) | |
return mun2cno(p->streams[id]->on_flow().raise(HERE)); | |
for (auto &it : p->streams) | |
if (!it.second->on_flow().raise(HERE)) | |
return mun2cno(); | |
return CNO_OK; | |
} | |
}; | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include <unistd.h> | |
#include <sys/socket.h> | |
#include <atomic> | |
#include <memory> | |
#include <string> | |
#include <thread> | |
extern "C" { | |
#define _Atomic(T) std::atomic<T> | |
#include <cone/cone.h> | |
#undef _Atomic | |
#undef cone | |
} | |
namespace mun { | |
#define HERE MUN_CURRENT_FRAME | |
struct error : mun_error { | |
using frame = struct mun_stackframe; | |
static error &last() noexcept { | |
return *reinterpret_cast<error*>(mun_last_error()); | |
} | |
void show(const char *prefix = "runtime") const noexcept { | |
mun_error_show(prefix, this); | |
} | |
}; | |
struct status { | |
constexpr status(int i) noexcept : ok(i >= 0) {} | |
constexpr status(bool b) noexcept : ok(b) {} | |
constexpr operator int() const noexcept { return ok ? 0 : -1; } | |
constexpr operator bool() const noexcept { return ok; } | |
template <typename... Args> | |
mun::status raise(mun::error::frame f, int code, const char *name, Args&&... args) const noexcept { | |
return ok ? 0 : mun_error_at(code, name, f, args...); | |
} | |
mun::status os(mun::error::frame f) const noexcept { | |
return ok ? 0 : mun_error_at(-errno, "errno", f, "OS error"); | |
} | |
mun::status raise(mun::error::frame f) const noexcept { | |
return ok ? 0 : mun_error_up(f); | |
} | |
private: | |
const bool ok; | |
}; | |
struct stringview { | |
const char *base; size_t size; | |
stringview(const char *b = "") noexcept : base(b), size(strlen(b)) {} | |
stringview(const char *b, size_t s) noexcept : base(b), size(s) {} | |
stringview(const std::string &s) noexcept : base(s.data()), size(s.size()) {} | |
}; | |
struct fd { | |
fd(int n = -1) noexcept : n(n) {} | |
fd(fd &&x) noexcept : n(x.n) { x.n = -1; } | |
fd(const fd &) = delete; | |
fd &operator=(fd &&x) noexcept { return std::swap(n, x.n), *this; } | |
fd &operator=(const fd &) = delete; | |
~fd() { if (n >= 0) ::close(n); } | |
operator int() const noexcept { return n; } | |
operator bool() const noexcept { return n >= 0; } | |
mun::status unblock() noexcept { | |
return cone_unblock(n); | |
} | |
private: | |
int n; | |
}; | |
} | |
struct cone { | |
cone() = default; | |
cone(cone&&) = default; | |
cone &operator=(cone&&) = default; | |
template <typename F> | |
explicit cone(F &f, size_t stack = CONE_DEFAULT_STACK) { | |
ptr.reset(cone_spawn(stack, cone_bind((&body<F, F*>), &f))); | |
} | |
template <typename F> | |
explicit cone(F &&f, size_t stack = CONE_DEFAULT_STACK) { | |
auto g = std::make_unique<F>(std::forward<F>(f)); | |
if (ptr.reset(cone_spawn(stack, cone_bind((&body<std::remove_reference_t<F>, decltype(g)>), g.get()))), ptr) | |
g.release(); | |
} | |
~cone() { | |
if (ptr && !(cancel() && wait())) | |
mun::error::last().show("cone destructor"); | |
} | |
operator bool() const noexcept { | |
return !!ptr; | |
} | |
void detach() noexcept { | |
ptr = nullptr; | |
} | |
mun::status wait(bool rethrow = false) const noexcept { | |
return cone_cowait(ptr.get(), rethrow ? 0 : CONE_NORETHROW); | |
} | |
mun::status cancel() const noexcept { | |
return cone_cancel(ptr.get()); | |
} | |
private: | |
std::unique_ptr<struct cone, int(*)(struct cone *) noexcept> ptr{nullptr, cone_drop}; | |
template <typename F, typename Fw> | |
static int body(F *f) noexcept { | |
return (*Fw{f})(); | |
} | |
public: | |
struct event : private cone_event { | |
event() noexcept : cone_event() {} | |
event(event &&e) { std::swap(*this, e); } | |
event(const event &) = delete; | |
event &operator=(event &&e) { return std::swap(*this, e), *this; } | |
event &operator=(const event &) = delete; | |
~event() { | |
mun_vec_fini(this); | |
} | |
mun::status wait(const std::atomic<unsigned> &a, unsigned expect) noexcept { | |
return cone_wait(this, &a, expect); | |
} | |
mun::status wait() noexcept { | |
return wait(std::atomic<unsigned>{0}, 0); | |
} | |
mun::status wake(size_t n = std::numeric_limits<size_t>::max()) noexcept { | |
return cone_wake(this, n); | |
} | |
}; | |
struct thread { | |
thread() noexcept = default; | |
thread(thread &&) = default; | |
thread &operator=(thread &&) = default; | |
template <typename F> | |
thread(F &&f, size_t stack = CONE_DEFAULT_STACK) { | |
int p[2]; | |
::socketpair(SOCK_STREAM, AF_UNIX, 0, p); | |
signal = mun::fd{p[0]}; | |
signal.unblock(); | |
handle = std::thread(&body<std::remove_reference_t<F>>, std::pair<std::remove_reference_t<F>, mun::fd>{std::forward<F>(f), p[1]}, stack); | |
} | |
~thread() { | |
if (signal && !(cancel() && wait())) | |
mun::error::last().show("thread destructor"); | |
} | |
mun::status wait() { | |
for (char buf; !mun::status{::read(signal, &buf, 1) >= 0}.os(HERE);) | |
return false; | |
return handle.join(), true; | |
} | |
mun::status cancel() noexcept { | |
return mun::status{::write(signal, "", 1) == 1}.os(HERE); | |
} | |
private: | |
mun::fd signal; | |
std::thread handle; | |
template <typename F> | |
static void body(std::pair<F, mun::fd> f, size_t stack) noexcept { | |
if (cone_loop(stack, cone_bind(corobody<F>, &f))) | |
mun::error::last().show("worker thread"); | |
} | |
template <typename F> | |
static int corobody(std::pair<F, mun::fd> *f) noexcept { | |
struct cone sighandler{[f, c = ::cone]() -> mun::status { | |
for (char buf; !mun::status{f->second.unblock() && ::read(f->second, &buf, 1) >= 0}.os(HERE);) | |
return false; | |
return cone_cancel(c); | |
}}; | |
return cone::body<F, F*>(&f->first); | |
} | |
}; | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// git clone https://github.com/pyos/libcno | |
// git clone https://github.com/pyos/cone | |
// make -C libcno | |
// make -C cone CFLAGS=-O3 | |
// g++ -O3 -Wall -Wextra -Ilibcno -I. -Llibcno/obj -std=c++14 server.cc cone/obj/{cone,cold,mun}.o -lcno -o server | |
#include <iostream> | |
#include "cno.h" | |
#include <signal.h> | |
#include <sys/socket.h> | |
#include <sys/select.h> | |
#include <netinet/in.h> | |
#include <netinet/tcp.h> | |
static mun::status echo(mun::fd client) { | |
return cno::server::run(std::move(client), [](cno::message &msg, cno::stream &stream) -> mun::status { | |
for (std::string payload, part; msg.read(part).raise(HERE); payload += part) { | |
if (part.empty()) | |
return stream.head(200, {{"server", "libcno/1.0"}, {"cache-control", "no-cache"}, {"content-length", std::to_string(payload.size())}}) | |
&& (msg.method == "HEAD" || stream.write(payload)); | |
if (payload.size() + part.size() >= 100000) | |
return stream.head(400, {{"content-length", "0"}}); | |
} | |
return false; | |
}); | |
} | |
namespace io { | |
static const int ONE = 1; | |
static mun::fd ipv6_server(unsigned char iface[16], uint32_t port, int backlog = 127) { | |
mun::fd r = ::socket(AF_INET6, SOCK_STREAM, 0); | |
if (!mun::status{r && r.unblock() && !setsockopt(r, SOL_SOCKET, SO_REUSEADDR, &ONE, sizeof(int))}.os(HERE)) | |
return -1; | |
struct sockaddr_in6 addr = {}; | |
addr.sin6_family = AF_INET6; | |
addr.sin6_port = htons(port); | |
memcpy(addr.sin6_addr.s6_addr, iface, 16); | |
if (!mun::status{!bind(r, reinterpret_cast<struct sockaddr *>(&addr), sizeof(addr)) && !listen(r, backlog)}.os(HERE)) | |
return -1; | |
return r; | |
} | |
template <typename F> | |
static mun::status tcp_server(mun::fd &fd, F &&handler) { | |
while (true) { | |
mun::fd child = accept(fd, NULL, NULL); | |
if (!mun::status{child && child.unblock() && !setsockopt(child, IPPROTO_TCP, TCP_NODELAY, &ONE, sizeof(int))}.os(HERE)) | |
return -1; | |
(struct cone){std::bind(handler, std::bind(std::move<mun::fd&>, std::move(child)))}.detach(); | |
} | |
} | |
} | |
static auto sigpipe = []() -> std::pair<mun::fd, mun::fd> { | |
for (int p[2]; !::pipe(p);) | |
return {p[0], p[1]}; | |
throw std::runtime_error("could not create a signal pipe"); | |
}(); | |
static void sighandle(int num) { | |
::write(sigpipe.second, &num, sizeof(int)); | |
} | |
int main() { | |
signal(SIGINT, &sighandle); | |
signal(SIGPIPE, SIG_IGN); | |
unsigned char all[16] = {}; | |
if (mun::fd server = io::ipv6_server(all, 8000)) { | |
std::vector<cone::thread> workers; | |
for (int i = 0; i < 4; i++) | |
workers.emplace_back([&server]() { return io::tcp_server(server, echo); }); | |
sigpipe.first.unblock(); | |
for (int num; ::read(sigpipe.first, &num, sizeof(int)) > 0;) | |
if (num == SIGINT) | |
break; | |
} | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment