Skip to content

Instantly share code, notes, and snippets.

@fakedrake
Created March 29, 2021 01:53
Show Gist options
  • Save fakedrake/7cc913261357a8cb213ddd965eb248c5 to your computer and use it in GitHub Desktop.
Save fakedrake/7cc913261357a8cb213ddd965eb248c5 to your computer and use it in GitHub Desktop.
#include <experimental/coroutine>
#include <stdio.h>
#include <assert.h>
using namespace std::experimental;
#define SCH_SIZE 10
// The queue can run concurrently as long as
//
// write{i}(dist,n), load{i+n}(dist,n)
//
// are done in the correct order
// co_awat <expr>
//
// --------------
// co_await(EXPR) {
// Awaitable && tmp = EXPR;
// if (!tmp.await_ready()) {
// tmp.await_suspend(coro_handle);
// <RETURN TO THE CALER>
// }
// return tmp.await_resume();
// }
template<typename T,typename P>
struct prefetch_awaitable {
T& value;
prefetch_awaitable(T& value) : value(value) {}
~prefetch_awaitable() {}
bool await_ready () { return false; }
coroutine_handle<P> await_suspend (coroutine_handle<P> h) {
printf("_mm_prefetch(%p)\n", std::addressof(value));
// _mm_prefetch(static_cast<const char*>(std::addressof(value)));
P& promise = h.promise();
assert(promise.owner);
auto& sch = promise.owner->scheduler;
sch.push_back(h);
return sch.pop_front();
}
T& await_resume () {return value;}
};
// Collect tasks and start running when we get to the limit.
struct event_loop {
size_t limit;
explicit event_loop(size_t limit) : limit(limit) {}
struct base_task {
struct promise_type {
// We want to notify the owner when the task ends.
event_loop* owner = nullptr;
// What do when the coroutine starts (maybe suspend right there)
// suspend_always: always suspends and does not produce a value.
suspend_always initial_suspend() { return {}; }
// What do when the coroutine ends
// suspend_never: never suspends and does not produce a value.
suspend_never final_suspend() {
owner->on_task_done();
return {};
}
void return_void() {}
// This actually builds the base_task
base_task get_return_object() {
return {coroutine_handle<promise_type>::from_promise(*this)};
}
void unhandled_exception() {}
};
// Associate the coroutine with the event_loop event loop.
coroutine_handle<promise_type> set_owner(event_loop* owner) {
coroutine_handle<promise_type> result = m_hndl;
m_hndl.promise().owner = owner;
m_hndl = nullptr;
return result;
}
base_task(coroutine_handle<promise_type> h) : m_hndl(h) {}
~base_task() { if (m_hndl) m_hndl.destroy(); }
private:
coroutine_handle<promise_type> m_hndl;
};
using coro_handle = coroutine_handle<typename base_task::promise_type>;
// A queue of coroutines
struct Scheduler {
Scheduler(const Scheduler& s) = delete;
Scheduler() : head(0), tail(0) {}
~Scheduler() {}
size_t head, tail;
coro_handle arr[SCH_SIZE];
void push_back(coro_handle h) {
arr[head] = h;
head = (head+1) % SCH_SIZE;
}
coro_handle pop_front() {
coro_handle result = arr[tail];
tail = (tail+1) % SCH_SIZE;
return result;
}
coro_handle try_pop_front () {
return tail != head ? pop_front() : coro_handle{};
}
bool empty() const { return tail == head; }
void run() {
while (coro_handle h = try_pop_front())
h.resume();
}
};
void join() {
scheduler.run();
}
template<typename T,typename Fn, typename ...Args>
void spawn(Fn t, Args&&... params) {
// if all the required tasks are gathered run
if (limit == 0) {
// this will run all coroutines
scheduler.pop_front().resume();
}
T hndl0 = t(std::forward<Args>(params)...);
auto h = hndl0.set_owner(this);
assert(h);
scheduler.push_back(h);
limit--;
}
void on_task_done() {++limit;}
Scheduler scheduler;
};
// This will await on prefetch.
//
// co_await prefetch(variable);
//
// Issues a prefetch instruction and passes control to the scheduler.
template<typename T>
auto prefetch(T& value) {
return prefetch_awaitable<T,typename event_loop::base_task::promise_type>{value};
}
int main () {
event_loop ev(3);
int val1 = 0;
int val2 = 0;
int val3 = 0;
for (int i = 0; i < 6; i++) {
ev.spawn<event_loop::base_task>([&] (int i) -> event_loop::base_task {
printf("[%d]Prefetching val1\n",i);
co_await prefetch<int>(val1);
printf("[%d]in cache val1, prefetching val2\n",i);
co_await prefetch<int>(val2);
printf("[%d]in cache val2, prefetching val3\n",i);
co_await prefetch<int>(val3);
printf("[%d]in cache val3\n",i);
co_return;
},i);
}
ev.join();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment