Last active
June 21, 2020 09:43
-
-
Save markpapadakis/eab3a2f75f1e1073f4b4 to your computer and use it in GitHub Desktop.
A very simple (first take) implementation of stack-less coroutines/actors
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// https://gist.github.com/markpapadakis/8dba5c480c13b12a056e (example) | |
// https://medium.com/@markpapadakis/high-performance-services-using-coroutines-ac8e9f54d727 | |
#include <switch.h> | |
#include <switch_print.h> | |
#include <switch_ll.h> | |
#include <switch_bitops.h> | |
#include <md5.h> | |
#include <text.h> | |
#include <network.h> | |
// Computed gotos for faster dispatch and lifted restrictions wrt to code that can be placed in switch {} | |
#define COROS_HAVE_COMPUTEDLABELSDISPATCH | |
class CorosScheduler; | |
// Caveats/gotchas: | |
// 1. you can't rely on stack(stackless coroutines/actors), so you need to use coroutine members for persisting state | |
// e.g for (uint32_t i = 0; i != 10; ++i) { YieldCoro(); Print(i, "\n"); } will not work | |
// whereas if you have a member uint32_t i, then for (i = 0; i != 10; ++i) { YieldCoro(); Print("i, "\n"); } will work fine | |
// | |
// 2. If your coroutine calls a function/functor and it or another factor in the call stack wants to yield, then the functor must | |
// be converted into a coroutine instead and scheduled via WaitCoro() | |
// | |
// 3. Because labels are used as return points, you can't create/initialize vars in BeginCoro() .. EndCoro() unless | |
// they are inside { }. This shouldn't be much of an issue. | |
// | |
// Benefits: | |
// Very lightweight coroutines. Because they are stackless, they can run on any thread, and its fairly trivial to optimize CorosScheduler::Run() to | |
// e.g consider type of coro and move it to a background thread (e.g for disk I/O), dequeing from a global/per-thread coroutines queue (submitted work) etc | |
// Priorities allow for an interesting scheme; this is based on Linux Kernel O(1) scheduler implementation semantics. | |
// | |
// You can also use thread_local freelists of various coroutine instances, as opposed to deleting them when completed, if you | |
// creating many 1000s/second and that becomes an issue. It's not implemented in this prototype, but it should be trivial to support that | |
// kind of reuse semantics. | |
struct coroutine | |
{ | |
friend class CorosScheduler; | |
using runres_t = coroutine *; | |
#ifdef COROS_HAVE_COMPUTEDLABELSDISPATCH | |
using resume_token_t = const void *; | |
#else | |
using resume_token_t = uint16_t; | |
#endif | |
struct coroutine | |
{ | |
protected: | |
switch_dlist schedulerList; | |
resume_token_t resumeToken{0}; | |
coroutine *parent{nullptr}; | |
// 8 different prioerities, from 0(lowest) to 7(highest) | |
// higher priority coros are executed first. | |
// e.g no coros of priority 0 will run unless there are no more runnable coros of priority 1 or higher | |
uint8_t prio{0}; | |
#pragma mark BEGIN:Coroutiens API | |
#ifdef COROS_HAVE_COMPUTEDLABELSDISPATCH | |
// Place at the beginning of your operato() impl. | |
#define BeginCoro() if (!resumeToken) { resumeToken = &&___coroEntry; } else { goto *resumeToken; } ___coroEntry: | |
// Place at the end of your operator() impl. | |
#define EndCoro() return 0 | |
#define YieldCoroImpl(res) resumeToken = &&__macro_concat(___coroLabel, __LINE__); return (runres_t)(res); __macro_concat(___coroLabel, __LINE__): | |
// Yield the coro, but next time it it's scheduled, it will be restarted as opposed to continue from where RestartCoro() has been invoked | |
// This is e.g useful for when you have a coro for network I/O, and you want to restart from beginning whenever it runs | |
#define RestartCoro() resumeToken = nullptr; return (runres_t)2 | |
#else | |
#define BeginCoro() switch (resumeToken) { case 0: | |
#define EndCoro() } return 0 | |
#define YieldCoroImpl(res) resumeToken = __LINE__; return (runres_t)(res); case __LINE__: | |
#define RestartCoro() resumeToken = 0; return (runres_t)2 | |
#endif | |
// Exit coro - won't reschedule; will delete/free it | |
#define ExitCoro() return 0 | |
// Yield to antother(if any) runnable coro. | |
// Will exit and return control back to the scheduler, which will place it back in runnable queues, but will | |
// choose another coro to run | |
#define YieldCoro() YieldCoroImpl(1) | |
// Will yield, but will not be placed back in the runnable queue sto run again | |
// Instead `c` will be scheduled and when it's done, then this coro will be placed back in the runnable queues to run again | |
#define WaitCoro(c) YieldCoroImpl(c) | |
#define WaitCoroWithPrio(c, p) YieldCoroImpl(coroutine::SetPrio(c, p)) | |
// Will yield, but coro will not be placed in the runnable queues. It will not be deleted, and is | |
// expected to be scheduled back again later. | |
// Useful for when e.g you want to 'block' this thread until say a network event comes in in which case your network I/O logic matches it | |
// with this coro and schedules it back in. Useful for rate, special-case workloads | |
// WaitCoroWithPrio() is a handy macro for also setting priority to coro | |
#define FreezeCoro() YieldCoroImpl(3) | |
// You may want to designate other scalar/constants as return values for YieldCoroImpl(). e.g 5 for schedule to bg thread, or 10 for reschedule again after 1 minute, etc. | |
#pragma mark END:Coroutines API | |
public: | |
coroutine(void) | |
{ | |
switch_dlist_init(&schedulerList); | |
} | |
virtual ~coroutine(void) | |
{ | |
} | |
virtual runres_t operator()(void) = 0; | |
_ALWAYS_inline_ auto Priority(void) -> uint8_t | |
{ | |
return prio; | |
} | |
void SetPriority(const uint8_t p) | |
{ | |
assert(p < 8); | |
prio = p; | |
} | |
// See: WaitCoroWithPrio() | |
static _ALWAYS_inline_ coroutine *SetPrio(coroutine *const c, const uint8_t p) | |
{ | |
c->SetPriority(p); | |
return c; | |
} | |
}; | |
// An example scheduler implementation | |
// For an alternative impl. see Run() comments | |
// A more sophisticated scheduler would run runnable coros, and also dequeue from a coros queue submitted by other threads, | |
// support delayed re-scheduling, etc. | |
// | |
// The overhead compared to not using coros is <= 1us | |
class CorosScheduler | |
{ | |
private: | |
switch_dlist corosList[8]; // Multiple runnable queues, for each supported prio | |
uint32_t blockedCoros{0}; // How many coros are blocked(created, not runnable) waiting for a child coro to complete | |
uint8_t runnableMask{0}; | |
private: | |
void ScheduleCoro(coroutine *const c) | |
{ | |
const auto p = c->Priority(); | |
if (unlikely(p >= 8)) | |
{ | |
// Low-priority; move to another background thread(e.g disk I/O operation) | |
// IMPLEMENT_ME | |
} | |
else | |
{ | |
switch_dlist_insert_before(&corosList[p], &c->schedulerList); | |
runnableMask|=(1U<<p); | |
} | |
} | |
void ScheduleCoroUnsafe(coroutine *const c) | |
{ | |
// TODO: IMPLEMENT ME | |
} | |
// Selects the next runnable; will choose from the highest priority with any runnables queues | |
coroutine *NextRunnable(void) | |
{ | |
if (!runnableMask) | |
{ | |
// Nothing in any runnable queue | |
return nullptr; | |
} | |
else | |
{ | |
const auto p = 7 - SwitchBitOps::LeadingZeros(runnableMask); | |
auto &l = corosList[p]; | |
auto *const coro= switch_list_entry(coroutine, schedulerList, l.next); | |
switch_dlist_del_and_reset(&coro->schedulerList); | |
if (switch_dlist_isempty(&l)) | |
{ | |
// This runnable queue is now empty | |
runnableMask&=~(1U << p); | |
} | |
return coro; | |
} | |
} | |
void FreeCoro(coroutine *const coro) | |
{ | |
// TODO: maintain thread_local freeList for coro->Type() | |
delete coro; | |
} | |
void RunCoro(coroutine *const coro) | |
{ | |
const auto r = (*coro)(); | |
switch ((uintptr_t)r) | |
{ | |
case 0: | |
// Coro has endeded | |
if (coro->parent) | |
{ | |
// Has a parent waiting for this coro's completion to resume | |
--blockedCoros; | |
ScheduleCoro(coro->parent); | |
} | |
FreeCoro(coro); | |
break; | |
case 1: | |
// Yielded, place back in runnable, but choose another coro now, if available | |
// See YieldCoro() | |
ScheduleCoro(coro); | |
break; | |
case 2: | |
// Finished, but it wants to run again as soon as it can | |
// This can be useful, though not sure how yet;) | |
// See RestartCoro() | |
ScheduleCoro(coro); | |
break; | |
case 3: | |
// Co-ro is frozen. That is, its not runnable, but we won't delete it | |
// The idea is that another thread or another coro will eventually make it runnable again later | |
// This is for some edge-cases where you need to freeze the coro, e.g run soemthing on another thread(not as a coro) and then | |
// notify the thread scheduler to schedule it back in | |
// See FreezeCoro() | |
break; | |
default: | |
// Coro is waiting for another coro, setup parenthood and schedule it (potentially to another thread?) | |
// See WaitCoro() | |
++blockedCoros; | |
r->parent = coro; | |
r->resumeToken = 0; | |
ScheduleCoro(r); | |
break; | |
} | |
} | |
bool RunNextRunnable(void) | |
{ | |
if (auto *const coro = NextRunnable()) | |
{ | |
RunCoro(coro); | |
return true; | |
} | |
else | |
return false; | |
} | |
inline bool AnyRunnable(void) const | |
{ | |
return runnableMask; | |
} | |
inline auto Blocked(void) const -> uint32_t | |
{ | |
return blockedCoros; | |
} | |
public: | |
CorosScheduler(void) | |
{ | |
for (auto &it : corosList) | |
switch_dlist_init(&it); | |
} | |
static void ScheduleInThreadScheduler(coroutine *const c) | |
{ | |
// Schedule in current thread scheduler | |
// TODO: implement me | |
} | |
void Schedule(coroutine *const c, const uint8_t prio = 0) | |
{ | |
ScheduleCoro(c); | |
c->SetPriority(prio); | |
c->resumeToken = 0; | |
} | |
// `c` ran on another thread and now it's done, and that thred handed it off back to us | |
// We need to check if it has a parent, and if it does, make it runnable again(was waiting for `c`) | |
void ProcessCompletedInAnotherThread(coroutine *const c) | |
{ | |
if (auto *const parent = c->parent) | |
{ | |
// submit into e.g thread-specific MPSQ queue to be dequeued later by e.g TryDequeSubmittedCoro() or DequeueSubmittedCoro() | |
ScheduleCoroUnsafe(parent); | |
} | |
delete c; | |
} | |
virtual void Run(void) | |
{ | |
// An alternative implementation of this method would | |
// also try dequeing from a thread-specific tasks queue, or a global queue, or whatever else | |
// e.g | |
// | |
// for (;;) | |
// { | |
// (void)RunNextRunnable(); | |
// | |
// if (AnyRunnable()) | |
// { | |
// // At least one runnable, don't block waiting for external work | |
// if (auto *const c = TryDequeSubmittedCoro()) | |
// ScheduleCoro(c); | |
// } | |
// else | |
// { | |
// // No runnables, block waiting for external work if needed | |
// ScheduleCoro(DequeueSubmittedCoro()); | |
// } | |
//} | |
// See also: https://gist.github.com/markpapadakis/8dba5c480c13b12a056e | |
// | |
// We could also implement network I/O poll as another coro with the lowest priority which | |
// returns control to the scheduler with RestartCoro(). See example in this file | |
// | |
// This is optimal for services that accept and manage connections I/O and also execute their requests | |
while (RunNextRunnable()) | |
{ | |
} | |
} | |
}; | |
// https://twitter.com/ID_AA_Carmack/status/575788622554628096 | |
static CorosScheduler TheScheduler; | |
struct singer_coro | |
: public coroutine | |
{ | |
resume_res_t operator()(void) override | |
{ | |
BeginCoro(); | |
Print("Singing\n"); | |
EndCoro(); | |
} | |
}; | |
struct diskreader_coro | |
: public coroutine | |
{ | |
strwlen32_t *out; // dummy | |
diskreader_coro(strwlen32_t *const o) | |
: out(o) | |
{ | |
} | |
resume_res_t operator()(void) override | |
{ | |
BeginCoro(); | |
out->Set(_S("Hello World")); | |
EndCoro(); | |
} | |
}; | |
struct dancer_coro | |
: public coroutine | |
{ | |
strwlen32_t localBuf; | |
uint32_t i; | |
resume_res_t operator()(void) override | |
{ | |
BeginCoro(); | |
Print("Dancing\n"); | |
YieldCoro(); | |
Print("Did Sing!\n"); | |
// Get some data into localBuf | |
// maybe this would block accessing the disk or whatever else. | |
// WaitCoro() will put this coro to sleep, waiting until another coro runs and then | |
// it's made runnable again | |
WaitCoro(new diskreader_coro(&localBuf)); | |
Print("Got:", localBuf, "\n"); | |
for (i = 0; i != 10; ++i) | |
{ | |
Print("i = ", i, "\n"); | |
if (i == 5) | |
ExitCoro(); | |
else | |
YieldCoro(); | |
} | |
EndCoro(); | |
} | |
}; | |
struct reader_coro | |
: public coroutine | |
{ | |
int fd; | |
const uint64_t offset, len; | |
void *const buf; | |
reader_coro(int _fd, void *const _buf, const uint64_t _offset, const uint64_t _len) | |
: fd(_fd), offset(_offset), len(_len), buf(_buf) | |
{ | |
} | |
resume_res_t operator()(void) override | |
{ | |
BeginCoro(); | |
(void)pread64(fd, buf, len, offset); | |
EndCoro(); | |
} | |
}; | |
// A simple MD5 checksum coro | |
struct task_coroutine | |
: public coroutine | |
{ | |
const char *const path; | |
uint64_t fileSize, offset, upto, span; | |
CMD5 md5Factory; | |
uint8_t buf[1024]; | |
int fd; | |
task_coroutine(const char *const fullPath) | |
: path{fullPath} | |
{ | |
} | |
resume_res_t operator()(void) override | |
{ | |
BeginCoro(); | |
fd = open(path, O_RDONLY); | |
assert(fd != -1); | |
fileSize = lseek64(fd, 0, SEEK_END); | |
md5Factory.Init(); | |
for (offset = 0; offset != fileSize; ) | |
{ | |
upto = Min(fileSize, offset + 1024); | |
span = upto - offset; | |
// If we did have pread2v(), we could attempt read, and if it failed with EAGAIN, we 'd | |
// use WaitCor() which would (based on scheduler semantics) schedule it on another background thread | |
// see: https://lwn.net/Articles/612483/ | |
WaitCoro(new reader_coro(fd, buf, offset, span)); | |
md5Factory.Update(buf, span); | |
offset = upto; | |
} | |
(void)close(fd); | |
uint8_t digest[16]; | |
md5Factory.Finalize(digest); | |
Print(hex_fmt(digest, 16), "\n"); | |
EndCoro(); | |
} | |
}; | |
struct fetchcf_coro | |
: public coroutine | |
{ | |
}; | |
int main(int argc, char *argv[]) | |
{ | |
#if 0 | |
TheScheduler.Schedule(new dancer_coro()); | |
TheScheduler.Schedule(new callable_coro( | |
[](void) | |
{ | |
Print("Hello World\n"); | |
})); | |
TheScheduler.Schedule(new singer_coro()); | |
#endif | |
TheScheduler.Schedule(new task_coroutine("/etc/passwd")); | |
TheScheduler.Run(); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment