Created
March 29, 2021 01:53
-
-
Save fakedrake/7cc913261357a8cb213ddd965eb248c5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <experimental/coroutine> | |
#include <stdio.h> | |
#include <assert.h> | |
using namespace std::experimental; | |
#define SCH_SIZE 10 | |
// The queue can run concurrently as long as | |
// | |
// write{i}(dist,n), load{i+n}(dist,n) | |
// | |
// are done in the correct order | |
// co_awat <expr> | |
// | |
// -------------- | |
// co_await(EXPR) { | |
// Awaitable && tmp = EXPR; | |
// if (!tmp.await_ready()) { | |
// tmp.await_suspend(coro_handle); | |
// <RETURN TO THE CALER> | |
// } | |
// return tmp.await_resume(); | |
// } | |
template<typename T,typename P> | |
struct prefetch_awaitable { | |
T& value; | |
prefetch_awaitable(T& value) : value(value) {} | |
~prefetch_awaitable() {} | |
bool await_ready () { return false; } | |
coroutine_handle<P> await_suspend (coroutine_handle<P> h) { | |
printf("_mm_prefetch(%p)\n", std::addressof(value)); | |
// _mm_prefetch(static_cast<const char*>(std::addressof(value))); | |
P& promise = h.promise(); | |
assert(promise.owner); | |
auto& sch = promise.owner->scheduler; | |
sch.push_back(h); | |
return sch.pop_front(); | |
} | |
T& await_resume () {return value;} | |
}; | |
// Collect tasks and start running when we get to the limit. | |
struct event_loop { | |
size_t limit; | |
explicit event_loop(size_t limit) : limit(limit) {} | |
struct base_task { | |
struct promise_type { | |
// We want to notify the owner when the task ends. | |
event_loop* owner = nullptr; | |
// What do when the coroutine starts (maybe suspend right there) | |
// suspend_always: always suspends and does not produce a value. | |
suspend_always initial_suspend() { return {}; } | |
// What do when the coroutine ends | |
// suspend_never: never suspends and does not produce a value. | |
suspend_never final_suspend() { | |
owner->on_task_done(); | |
return {}; | |
} | |
void return_void() {} | |
// This actually builds the base_task | |
base_task get_return_object() { | |
return {coroutine_handle<promise_type>::from_promise(*this)}; | |
} | |
void unhandled_exception() {} | |
}; | |
// Associate the coroutine with the event_loop event loop. | |
coroutine_handle<promise_type> set_owner(event_loop* owner) { | |
coroutine_handle<promise_type> result = m_hndl; | |
m_hndl.promise().owner = owner; | |
m_hndl = nullptr; | |
return result; | |
} | |
base_task(coroutine_handle<promise_type> h) : m_hndl(h) {} | |
~base_task() { if (m_hndl) m_hndl.destroy(); } | |
private: | |
coroutine_handle<promise_type> m_hndl; | |
}; | |
using coro_handle = coroutine_handle<typename base_task::promise_type>; | |
// A queue of coroutines | |
struct Scheduler { | |
Scheduler(const Scheduler& s) = delete; | |
Scheduler() : head(0), tail(0) {} | |
~Scheduler() {} | |
size_t head, tail; | |
coro_handle arr[SCH_SIZE]; | |
void push_back(coro_handle h) { | |
arr[head] = h; | |
head = (head+1) % SCH_SIZE; | |
} | |
coro_handle pop_front() { | |
coro_handle result = arr[tail]; | |
tail = (tail+1) % SCH_SIZE; | |
return result; | |
} | |
coro_handle try_pop_front () { | |
return tail != head ? pop_front() : coro_handle{}; | |
} | |
bool empty() const { return tail == head; } | |
void run() { | |
while (coro_handle h = try_pop_front()) | |
h.resume(); | |
} | |
}; | |
void join() { | |
scheduler.run(); | |
} | |
template<typename T,typename Fn, typename ...Args> | |
void spawn(Fn t, Args&&... params) { | |
// if all the required tasks are gathered run | |
if (limit == 0) { | |
// this will run all coroutines | |
scheduler.pop_front().resume(); | |
} | |
T hndl0 = t(std::forward<Args>(params)...); | |
auto h = hndl0.set_owner(this); | |
assert(h); | |
scheduler.push_back(h); | |
limit--; | |
} | |
void on_task_done() {++limit;} | |
Scheduler scheduler; | |
}; | |
// This will await on prefetch. | |
// | |
// co_await prefetch(variable); | |
// | |
// Issues a prefetch instruction and passes control to the scheduler. | |
template<typename T> | |
auto prefetch(T& value) { | |
return prefetch_awaitable<T,typename event_loop::base_task::promise_type>{value}; | |
} | |
int main () { | |
event_loop ev(3); | |
int val1 = 0; | |
int val2 = 0; | |
int val3 = 0; | |
for (int i = 0; i < 6; i++) { | |
ev.spawn<event_loop::base_task>([&] (int i) -> event_loop::base_task { | |
printf("[%d]Prefetching val1\n",i); | |
co_await prefetch<int>(val1); | |
printf("[%d]in cache val1, prefetching val2\n",i); | |
co_await prefetch<int>(val2); | |
printf("[%d]in cache val2, prefetching val3\n",i); | |
co_await prefetch<int>(val3); | |
printf("[%d]in cache val3\n",i); | |
co_return; | |
},i); | |
} | |
ev.join(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment