Skip to content

Instantly share code, notes, and snippets.

@markpapadakis
Last active June 21, 2020 09:43
Show Gist options
  • Save markpapadakis/eab3a2f75f1e1073f4b4 to your computer and use it in GitHub Desktop.
Save markpapadakis/eab3a2f75f1e1073f4b4 to your computer and use it in GitHub Desktop.
A very simple (first take) implementation of stack-less coroutines/actors
// https://gist.github.com/markpapadakis/8dba5c480c13b12a056e (example)
// https://medium.com/@markpapadakis/high-performance-services-using-coroutines-ac8e9f54d727
#include <switch.h>
#include <switch_print.h>
#include <switch_ll.h>
#include <switch_bitops.h>
#include <md5.h>
#include <text.h>
#include <network.h>
// Computed gotos for faster dispatch and lifted restrictions wrt to code that can be placed in switch {}
#define COROS_HAVE_COMPUTEDLABELSDISPATCH
class CorosScheduler;
// Caveats/gotchas:
// 1. you can't rely on stack(stackless coroutines/actors), so you need to use coroutine members for persisting state
// e.g for (uint32_t i = 0; i != 10; ++i) { YieldCoro(); Print(i, "\n"); } will not work
// whereas if you have a member uint32_t i, then for (i = 0; i != 10; ++i) { YieldCoro(); Print("i, "\n"); } will work fine
//
// 2. If your coroutine calls a function/functor and it or another factor in the call stack wants to yield, then the functor must
// be converted into a coroutine instead and scheduled via WaitCoro()
//
// 3. Because labels are used as return points, you can't create/initialize vars in BeginCoro() .. EndCoro() unless
// they are inside { }. This shouldn't be much of an issue.
//
// Benefits:
// Very lightweight coroutines. Because they are stackless, they can run on any thread, and its fairly trivial to optimize CorosScheduler::Run() to
// e.g consider type of coro and move it to a background thread (e.g for disk I/O), dequeing from a global/per-thread coroutines queue (submitted work) etc
// Priorities allow for an interesting scheme; this is based on Linux Kernel O(1) scheduler implementation semantics.
//
// You can also use thread_local freelists of various coroutine instances, as opposed to deleting them when completed, if you
// creating many 1000s/second and that becomes an issue. It's not implemented in this prototype, but it should be trivial to support that
// kind of reuse semantics.
struct coroutine
{
friend class CorosScheduler;
using runres_t = coroutine *;
#ifdef COROS_HAVE_COMPUTEDLABELSDISPATCH
using resume_token_t = const void *;
#else
using resume_token_t = uint16_t;
#endif
struct coroutine
{
protected:
switch_dlist schedulerList;
resume_token_t resumeToken{0};
coroutine *parent{nullptr};
// 8 different prioerities, from 0(lowest) to 7(highest)
// higher priority coros are executed first.
// e.g no coros of priority 0 will run unless there are no more runnable coros of priority 1 or higher
uint8_t prio{0};
#pragma mark BEGIN:Coroutiens API
#ifdef COROS_HAVE_COMPUTEDLABELSDISPATCH
// Place at the beginning of your operato() impl.
#define BeginCoro() if (!resumeToken) { resumeToken = &&___coroEntry; } else { goto *resumeToken; } ___coroEntry:
// Place at the end of your operator() impl.
#define EndCoro() return 0
#define YieldCoroImpl(res) resumeToken = &&__macro_concat(___coroLabel, __LINE__); return (runres_t)(res); __macro_concat(___coroLabel, __LINE__):
// Yield the coro, but next time it it's scheduled, it will be restarted as opposed to continue from where RestartCoro() has been invoked
// This is e.g useful for when you have a coro for network I/O, and you want to restart from beginning whenever it runs
#define RestartCoro() resumeToken = nullptr; return (runres_t)2
#else
#define BeginCoro() switch (resumeToken) { case 0:
#define EndCoro() } return 0
#define YieldCoroImpl(res) resumeToken = __LINE__; return (runres_t)(res); case __LINE__:
#define RestartCoro() resumeToken = 0; return (runres_t)2
#endif
// Exit coro - won't reschedule; will delete/free it
#define ExitCoro() return 0
// Yield to antother(if any) runnable coro.
// Will exit and return control back to the scheduler, which will place it back in runnable queues, but will
// choose another coro to run
#define YieldCoro() YieldCoroImpl(1)
// Will yield, but will not be placed back in the runnable queue sto run again
// Instead `c` will be scheduled and when it's done, then this coro will be placed back in the runnable queues to run again
#define WaitCoro(c) YieldCoroImpl(c)
#define WaitCoroWithPrio(c, p) YieldCoroImpl(coroutine::SetPrio(c, p))
// Will yield, but coro will not be placed in the runnable queues. It will not be deleted, and is
// expected to be scheduled back again later.
// Useful for when e.g you want to 'block' this thread until say a network event comes in in which case your network I/O logic matches it
// with this coro and schedules it back in. Useful for rate, special-case workloads
// WaitCoroWithPrio() is a handy macro for also setting priority to coro
#define FreezeCoro() YieldCoroImpl(3)
// You may want to designate other scalar/constants as return values for YieldCoroImpl(). e.g 5 for schedule to bg thread, or 10 for reschedule again after 1 minute, etc.
#pragma mark END:Coroutines API
public:
coroutine(void)
{
switch_dlist_init(&schedulerList);
}
virtual ~coroutine(void)
{
}
virtual runres_t operator()(void) = 0;
_ALWAYS_inline_ auto Priority(void) -> uint8_t
{
return prio;
}
void SetPriority(const uint8_t p)
{
assert(p < 8);
prio = p;
}
// See: WaitCoroWithPrio()
static _ALWAYS_inline_ coroutine *SetPrio(coroutine *const c, const uint8_t p)
{
c->SetPriority(p);
return c;
}
};
// An example scheduler implementation
// For an alternative impl. see Run() comments
// A more sophisticated scheduler would run runnable coros, and also dequeue from a coros queue submitted by other threads,
// support delayed re-scheduling, etc.
//
// The overhead compared to not using coros is <= 1us
class CorosScheduler
{
private:
switch_dlist corosList[8]; // Multiple runnable queues, for each supported prio
uint32_t blockedCoros{0}; // How many coros are blocked(created, not runnable) waiting for a child coro to complete
uint8_t runnableMask{0};
private:
void ScheduleCoro(coroutine *const c)
{
const auto p = c->Priority();
if (unlikely(p >= 8))
{
// Low-priority; move to another background thread(e.g disk I/O operation)
// IMPLEMENT_ME
}
else
{
switch_dlist_insert_before(&corosList[p], &c->schedulerList);
runnableMask|=(1U<<p);
}
}
void ScheduleCoroUnsafe(coroutine *const c)
{
// TODO: IMPLEMENT ME
}
// Selects the next runnable; will choose from the highest priority with any runnables queues
coroutine *NextRunnable(void)
{
if (!runnableMask)
{
// Nothing in any runnable queue
return nullptr;
}
else
{
const auto p = 7 - SwitchBitOps::LeadingZeros(runnableMask);
auto &l = corosList[p];
auto *const coro= switch_list_entry(coroutine, schedulerList, l.next);
switch_dlist_del_and_reset(&coro->schedulerList);
if (switch_dlist_isempty(&l))
{
// This runnable queue is now empty
runnableMask&=~(1U << p);
}
return coro;
}
}
void FreeCoro(coroutine *const coro)
{
// TODO: maintain thread_local freeList for coro->Type()
delete coro;
}
void RunCoro(coroutine *const coro)
{
const auto r = (*coro)();
switch ((uintptr_t)r)
{
case 0:
// Coro has endeded
if (coro->parent)
{
// Has a parent waiting for this coro's completion to resume
--blockedCoros;
ScheduleCoro(coro->parent);
}
FreeCoro(coro);
break;
case 1:
// Yielded, place back in runnable, but choose another coro now, if available
// See YieldCoro()
ScheduleCoro(coro);
break;
case 2:
// Finished, but it wants to run again as soon as it can
// This can be useful, though not sure how yet;)
// See RestartCoro()
ScheduleCoro(coro);
break;
case 3:
// Co-ro is frozen. That is, its not runnable, but we won't delete it
// The idea is that another thread or another coro will eventually make it runnable again later
// This is for some edge-cases where you need to freeze the coro, e.g run soemthing on another thread(not as a coro) and then
// notify the thread scheduler to schedule it back in
// See FreezeCoro()
break;
default:
// Coro is waiting for another coro, setup parenthood and schedule it (potentially to another thread?)
// See WaitCoro()
++blockedCoros;
r->parent = coro;
r->resumeToken = 0;
ScheduleCoro(r);
break;
}
}
bool RunNextRunnable(void)
{
if (auto *const coro = NextRunnable())
{
RunCoro(coro);
return true;
}
else
return false;
}
inline bool AnyRunnable(void) const
{
return runnableMask;
}
inline auto Blocked(void) const -> uint32_t
{
return blockedCoros;
}
public:
CorosScheduler(void)
{
for (auto &it : corosList)
switch_dlist_init(&it);
}
static void ScheduleInThreadScheduler(coroutine *const c)
{
// Schedule in current thread scheduler
// TODO: implement me
}
void Schedule(coroutine *const c, const uint8_t prio = 0)
{
ScheduleCoro(c);
c->SetPriority(prio);
c->resumeToken = 0;
}
// `c` ran on another thread and now it's done, and that thred handed it off back to us
// We need to check if it has a parent, and if it does, make it runnable again(was waiting for `c`)
void ProcessCompletedInAnotherThread(coroutine *const c)
{
if (auto *const parent = c->parent)
{
// submit into e.g thread-specific MPSQ queue to be dequeued later by e.g TryDequeSubmittedCoro() or DequeueSubmittedCoro()
ScheduleCoroUnsafe(parent);
}
delete c;
}
virtual void Run(void)
{
// An alternative implementation of this method would
// also try dequeing from a thread-specific tasks queue, or a global queue, or whatever else
// e.g
//
// for (;;)
// {
// (void)RunNextRunnable();
//
// if (AnyRunnable())
// {
// // At least one runnable, don't block waiting for external work
// if (auto *const c = TryDequeSubmittedCoro())
// ScheduleCoro(c);
// }
// else
// {
// // No runnables, block waiting for external work if needed
// ScheduleCoro(DequeueSubmittedCoro());
// }
//}
// See also: https://gist.github.com/markpapadakis/8dba5c480c13b12a056e
//
// We could also implement network I/O poll as another coro with the lowest priority which
// returns control to the scheduler with RestartCoro(). See example in this file
//
// This is optimal for services that accept and manage connections I/O and also execute their requests
while (RunNextRunnable())
{
}
}
};
// https://twitter.com/ID_AA_Carmack/status/575788622554628096
static CorosScheduler TheScheduler;
struct singer_coro
: public coroutine
{
resume_res_t operator()(void) override
{
BeginCoro();
Print("Singing\n");
EndCoro();
}
};
struct diskreader_coro
: public coroutine
{
strwlen32_t *out; // dummy
diskreader_coro(strwlen32_t *const o)
: out(o)
{
}
resume_res_t operator()(void) override
{
BeginCoro();
out->Set(_S("Hello World"));
EndCoro();
}
};
struct dancer_coro
: public coroutine
{
strwlen32_t localBuf;
uint32_t i;
resume_res_t operator()(void) override
{
BeginCoro();
Print("Dancing\n");
YieldCoro();
Print("Did Sing!\n");
// Get some data into localBuf
// maybe this would block accessing the disk or whatever else.
// WaitCoro() will put this coro to sleep, waiting until another coro runs and then
// it's made runnable again
WaitCoro(new diskreader_coro(&localBuf));
Print("Got:", localBuf, "\n");
for (i = 0; i != 10; ++i)
{
Print("i = ", i, "\n");
if (i == 5)
ExitCoro();
else
YieldCoro();
}
EndCoro();
}
};
struct reader_coro
: public coroutine
{
int fd;
const uint64_t offset, len;
void *const buf;
reader_coro(int _fd, void *const _buf, const uint64_t _offset, const uint64_t _len)
: fd(_fd), offset(_offset), len(_len), buf(_buf)
{
}
resume_res_t operator()(void) override
{
BeginCoro();
(void)pread64(fd, buf, len, offset);
EndCoro();
}
};
// A simple MD5 checksum coro
struct task_coroutine
: public coroutine
{
const char *const path;
uint64_t fileSize, offset, upto, span;
CMD5 md5Factory;
uint8_t buf[1024];
int fd;
task_coroutine(const char *const fullPath)
: path{fullPath}
{
}
resume_res_t operator()(void) override
{
BeginCoro();
fd = open(path, O_RDONLY);
assert(fd != -1);
fileSize = lseek64(fd, 0, SEEK_END);
md5Factory.Init();
for (offset = 0; offset != fileSize; )
{
upto = Min(fileSize, offset + 1024);
span = upto - offset;
// If we did have pread2v(), we could attempt read, and if it failed with EAGAIN, we 'd
// use WaitCor() which would (based on scheduler semantics) schedule it on another background thread
// see: https://lwn.net/Articles/612483/
WaitCoro(new reader_coro(fd, buf, offset, span));
md5Factory.Update(buf, span);
offset = upto;
}
(void)close(fd);
uint8_t digest[16];
md5Factory.Finalize(digest);
Print(hex_fmt(digest, 16), "\n");
EndCoro();
}
};
struct fetchcf_coro
: public coroutine
{
};
int main(int argc, char *argv[])
{
#if 0
TheScheduler.Schedule(new dancer_coro());
TheScheduler.Schedule(new callable_coro(
[](void)
{
Print("Hello World\n");
}));
TheScheduler.Schedule(new singer_coro());
#endif
TheScheduler.Schedule(new task_coroutine("/etc/passwd"));
TheScheduler.Run();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment