-
-
Save abstractalgo/ed14b3fe1b286710b8df0011e90daee8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <stdlib.h> | |
#include <stdint.h> | |
#include <assert.h> | |
#include <string.h> | |
#define streq(a, b) (!strcmp((a), (b))) | |
#ifndef __USE_GNU | |
#define __USE_GNU | |
#endif | |
#include <setjmp.h> | |
#include <sched.h> | |
#include <unistd.h> | |
#include <pthread.h> | |
#include "xmmintrin.h" | |
#include <ucontext.h> | |
typedef int8_t i8; | |
typedef int16_t i16; | |
typedef int32_t i32; | |
typedef int64_t i64; | |
typedef uint8_t u8; | |
typedef uint16_t u16; | |
typedef uint32_t u32; | |
typedef uint64_t u64; | |
typedef long word; | |
#define MAX_WORK_QUEUE_THREAD 64 | |
#define FIBER_STACK_SIZE (64 * 1024) | |
#define MAX_SCHEDULER_WORKER 64 | |
#define MAX_SCHEDULER_FIBERS 128 | |
/* -------------------------------------------------------------- | |
* | |
* SPINLOCK | |
* | |
* --------------------------------------------------------------*/ | |
static void | |
spinlock_begin(volatile u32 *spinlock) | |
{while (__sync_val_compare_and_swap(spinlock, 1, 0) != 0);} | |
static void | |
spinlock_end(volatile u32 *spinlock) | |
{_mm_sfence(); *spinlock = 0;} | |
/* -------------------------------------------------------------- | |
* | |
* SEMAPHORE | |
* | |
* --------------------------------------------------------------*/ | |
struct sem { | |
pthread_mutex_t guard; | |
pthread_cond_t cond; | |
int count; | |
}; | |
static void | |
sem_init(struct sem *s, int init) | |
{ | |
pthread_mutex_init(&s->guard, 0); | |
pthread_cond_init(&s->cond, 0); | |
s->count = init; | |
} | |
static void | |
sem_free(struct sem *s) | |
{ | |
pthread_mutex_destroy(&s->guard); | |
pthread_cond_destroy(&s->cond); | |
} | |
static void | |
sem_post(struct sem *s, int delta) | |
{ | |
if (delta < 0) return; | |
pthread_mutex_lock(&s->guard); | |
s->count += delta; | |
pthread_cond_broadcast(&s->cond); | |
pthread_mutex_unlock(&s->guard); | |
} | |
static void | |
sem_wait(struct sem *s, int delta) | |
{ | |
if (delta < 0 ) return; | |
pthread_mutex_lock(&s->guard); | |
do { | |
if (s->count >= delta) { | |
s->count -= delta; | |
break; | |
} | |
pthread_cond_wait(&s->cond, &s->guard); | |
} while (1); | |
pthread_mutex_unlock(&s->guard); | |
} | |
/* -------------------------------------------------------------- | |
* | |
* FIBER | |
* | |
* --------------------------------------------------------------*/ | |
#undef _FORTIFY_SOURCE | |
typedef void(*fiber_callback)(void*); | |
struct sys_fiber { | |
ucontext_t fib; | |
}; | |
static void | |
fiber_create(struct sys_fiber *fib, void *stack, size_t stack_size, | |
fiber_callback callback, void *data) | |
{ | |
getcontext(&fib->fib); | |
fib->fib.uc_stack.ss_size = stack_size; | |
fib->fib.uc_stack.ss_sp = stack; | |
fib->fib.uc_link = 0; | |
makecontext(&fib->fib, (void(*)())callback, 1, data); | |
} | |
static void | |
fiber_switch_to(struct sys_fiber *prev, struct sys_fiber *fib) | |
{ | |
swapcontext(&prev->fib, &fib->fib); | |
} | |
/* -------------------------------------------------------------- | |
* | |
* QUEUE | |
* | |
* --------------------------------------------------------------*/ | |
/* This is an implementation of a multi producer and consumer Non-Blocking | |
* Concurrent FIFO Queue based on the paper from Phillippas Tsigas and Yi Zhangs: | |
* www.cse.chalmers.se/~tsigas/papers/latest-spaa01.pdf */ | |
#define MAX_WORK_QUEUE_JOBS (1024) | |
#define WORK_QUEUE_MASK (MAX_WORK_QUEUE_JOBS-1) | |
struct scheduler; | |
typedef void(*job_callback)(struct scheduler*,void*); | |
#define JOB_ENTRY_POINT(name) static void name(struct scheduler *sched, void *arg) | |
#define BASE_ALIGN(x) __attribute__((aligned(x))) | |
#define QUEUE_EMPTY 0 | |
#define QUEUE_REMOVED 1 | |
struct job { | |
void *data; | |
job_callback callback; | |
volatile u32 *run_count; | |
}; | |
struct job_queue { | |
volatile u32 head; | |
struct job *jobs[MAX_WORK_QUEUE_JOBS]; | |
volatile u32 tail; | |
}; | |
static void | |
job_queue_init(struct job_queue *q) | |
{ | |
memset(q, 0, sizeof(*q)); | |
q->jobs[0] = QUEUE_EMPTY; | |
q->head = 0; | |
q->tail = 1; | |
} | |
static int | |
job_queue_entry_free(struct job *p) | |
{ | |
return (((uintptr_t)p == QUEUE_EMPTY) || ((uintptr_t)p == QUEUE_REMOVED)); | |
} | |
static int | |
job_queue_push(struct job_queue *q, struct job *job) | |
{ | |
while (1) { | |
/* read tail */ | |
u32 te = q->tail; | |
u32 ate = te; | |
struct job *tt = q->jobs[ate]; | |
u32 tmp = (ate + 1) & WORK_QUEUE_MASK; | |
struct job *tnew; | |
/* we want to find the actual tail */ | |
while (!(job_queue_entry_free(tt))) { | |
/* check tails consistency */ | |
if (te != q->tail) goto retry; | |
/* check if queue is full */ | |
if (tmp == q->head) break; | |
tt = q->jobs[tmp]; | |
ate = tmp; | |
tmp = (ate + 1) & WORK_QUEUE_MASK; | |
} | |
/* check tails consistency */ | |
if (te != q->tail) continue; | |
/* check if queue is full */ | |
if (tmp == q->head) { | |
ate = (tmp + 1) & WORK_QUEUE_MASK; | |
tt = q->jobs[ate]; | |
if (!(job_queue_entry_free(tt))) | |
return 0; /* queue is full */ | |
/* let pop update header */ | |
__sync_bool_compare_and_swap(&q->head, tmp, ate); | |
continue; | |
} | |
if ((uintptr_t)tt == QUEUE_REMOVED) | |
job = (struct job*)((uintptr_t)job | 0x01); | |
if (te != q->tail) continue; | |
if (__sync_bool_compare_and_swap(&q->jobs[ate], tt, job)) { | |
if ((tmp & 1) == 0) | |
__sync_bool_compare_and_swap(&q->tail, te, tmp); | |
return 1; | |
} | |
retry:; | |
} | |
} | |
static int | |
job_queue_pop(struct job **job, struct job_queue *q) | |
{ | |
while (1) { | |
u32 th = q->head; | |
u32 tmp = (th + 1) & WORK_QUEUE_MASK; | |
struct job *tt = q->jobs[tmp]; | |
struct job *tnull = 0; | |
/* we want to find the actual head */ | |
while ((job_queue_entry_free(tt))) { | |
if (th != q->head) goto retry; | |
if (tmp == q->tail) return 0; | |
tmp = (tmp + 1) & WORK_QUEUE_MASK; | |
tt = q->jobs[tmp]; | |
} | |
/* check head's consistency */ | |
if (th != q->head) continue; | |
/* check if queue is empty */ | |
if (tmp == q->tail) { | |
/* help push to update end */ | |
__sync_bool_compare_and_swap(&q->tail, tmp, (tmp+1) & WORK_QUEUE_MASK); | |
continue; /* retry */ | |
} | |
tnull = (((uintptr_t)tt & 0x01) ? (struct job*)QUEUE_REMOVED: (struct job*)QUEUE_EMPTY); | |
if (th != q->head) continue; | |
/* get actual head */ | |
if (__sync_bool_compare_and_swap(&q->jobs[tmp], tt, tnull)) { | |
if ((tmp & 0x1) == 0) | |
__sync_bool_compare_and_swap(&q->head, th, tmp); | |
*job = (struct job*)((uintptr_t)tt & ~(uintptr_t)1); | |
return 1; | |
} | |
retry:; | |
} | |
} | |
/* -------------------------------------------------------------- | |
* | |
* SCHEDULER | |
* | |
* --------------------------------------------------------------*/ | |
typedef volatile u32 job_counter; | |
enum job_queue_ids { | |
JOB_QUEUE_LOW, | |
JOB_QUEUE_NORMAL, | |
JOB_QUEUE_HIGH, | |
JOB_QUEUE_COUNT | |
}; | |
struct scheduler_fiber { | |
struct scheduler_fiber *next; | |
struct scheduler_fiber *prev; | |
void *stack; | |
size_t stack_size; | |
struct sys_fiber handle; | |
struct job job; | |
u32 value; | |
}; | |
struct scheduler_worker { | |
int id; | |
pthread_t thread; | |
struct scheduler_fiber *context; | |
struct scheduler *sched; | |
}; | |
typedef void (*scheduler_profiler_callback_f)(void*, int thread_id); | |
struct scheduler_profiling { | |
void *userdata; | |
scheduler_profiler_callback_f thread_start; | |
scheduler_profiler_callback_f thread_stop; | |
scheduler_profiler_callback_f context_switch; | |
scheduler_profiler_callback_f wait_start; | |
scheduler_profiler_callback_f wait_stop; | |
}; | |
struct scheduler { | |
struct sem work_sem; | |
struct job_queue queue[JOB_QUEUE_COUNT]; | |
int worker_count; | |
struct scheduler_worker worker[MAX_SCHEDULER_WORKER]; | |
volatile u32 worker_running; | |
volatile u32 worker_active; | |
struct scheduler_profiling profiler; | |
int fiber_count; | |
struct scheduler_fiber fibers[MAX_SCHEDULER_FIBERS]; | |
volatile u32 wait_list_lock; | |
struct scheduler_fiber *wait_list; | |
volatile u32 free_list_lock; | |
struct scheduler_fiber *free_list; | |
}; | |
static struct job | |
Job(job_callback callback, void *data) | |
{ | |
struct job task; | |
task.callback = callback; | |
task.data = data; | |
task.run_count = 0; | |
return task; | |
} | |
static void | |
scheduler_hook_into_list(struct scheduler_fiber **list, | |
struct scheduler_fiber *element, volatile u32 *lock) | |
{ | |
spinlock_begin(lock); | |
if (!*list) { | |
*list = element; | |
element->prev = 0; | |
element->next = 0; | |
} else { | |
element->prev = 0; | |
element->next = *list; | |
(*list)->prev = element; | |
*list = element; | |
} | |
spinlock_end(lock); | |
} | |
static void | |
scheduler_unhook_from_list(struct scheduler_fiber **list, | |
struct scheduler_fiber *element, volatile u32 *lock) | |
{ | |
spinlock_begin(lock); | |
if (element->next) | |
element->next->prev = element->prev; | |
if (element->prev) | |
element->prev->next = element->next; | |
if (*list == element) | |
*list = element->next; | |
element->next = element->prev = 0; | |
spinlock_end(lock); | |
} | |
static struct scheduler_fiber* | |
scheduler_find_fiber_finished_waiting(struct scheduler *s) | |
{ | |
struct scheduler_fiber *iter = s->wait_list; | |
spinlock_begin(&s->wait_list_lock); | |
while (iter) { | |
if (*iter->job.run_count == iter->value) break; | |
iter = iter->next; | |
} | |
spinlock_end(&s->wait_list_lock); | |
return iter; | |
} | |
static struct scheduler_fiber* | |
scheduler_get_free_fiber(struct scheduler *s) | |
{ | |
struct scheduler_fiber *fib = 0; | |
spinlock_begin(&s->free_list_lock); | |
if (s->fiber_count < MAX_SCHEDULER_FIBERS) { | |
fib = &s->fibers[s->fiber_count++]; | |
} else if (s->free_list) { | |
fib = s->free_list; | |
} | |
spinlock_end(&s->free_list_lock); | |
return fib; | |
} | |
static void | |
scheduler_run(struct scheduler *s, enum job_queue_ids q, struct job *jobs, | |
u32 count, job_counter *counter) | |
{ | |
u32 jobIndex = 0; | |
struct job_queue *queue; | |
assert(q < JOB_QUEUE_COUNT); | |
assert(counter); | |
assert(jobs); | |
assert(s); | |
queue = &s->queue[q]; | |
while (jobIndex < count) { | |
jobs[jobIndex].run_count = counter; | |
if (job_queue_push(queue, &jobs[jobIndex])) { | |
sem_post(&s->work_sem, 1); | |
jobIndex++; | |
} | |
} | |
*counter = count; | |
} | |
static void | |
fiber_proc(void *arg) | |
{ | |
struct scheduler_worker *w = (struct scheduler_worker*)arg; | |
struct scheduler *s = w->sched; | |
__sync_add_and_fetch(&s->worker_active, 1); | |
while (1) { | |
/* check if any fiber is done waiting */ | |
struct scheduler_fiber *fiber = scheduler_find_fiber_finished_waiting(s); | |
if (fiber) { | |
/* put old worker context into freelist */ | |
struct scheduler_fiber *old = w->context; | |
memset(w->context, 0, sizeof(*w->context)); | |
scheduler_unhook_from_list(&s->wait_list, fiber, &s->wait_list_lock); | |
scheduler_hook_into_list(&s->free_list, old, &s->free_list_lock); | |
/* set previously waiting fiber as worker context */ | |
w->context = fiber; | |
if (s->profiler.context_switch) | |
s->profiler.context_switch(s->profiler.userdata, w->id); | |
fiber_switch_to(&old->handle, &w->context->handle); | |
} | |
/* check if any new jobs inside work queues */ | |
{struct job *job = 0; | |
if (!(job_queue_pop(&job, &s->queue[JOB_QUEUE_HIGH])) && | |
!(job_queue_pop(&job, &s->queue[JOB_QUEUE_NORMAL])) && | |
!(job_queue_pop(&job, &s->queue[JOB_QUEUE_LOW]))) { | |
/* currently no job so wait */ | |
__sync_sub_and_fetch(&s->worker_active, 1); | |
if (s->profiler.wait_start) | |
s->profiler.wait_start(s->profiler.userdata, w->id); | |
sem_wait(&s->work_sem, 1); | |
if (s->profiler.wait_stop) | |
s->profiler.wait_stop(s->profiler.userdata, w->id); | |
__sync_add_and_fetch(&s->worker_active, 1); | |
} else { | |
/* run dequeued job */ | |
w->context->job = *job; | |
assert(job->callback); | |
if (s->profiler.thread_start) | |
s->profiler.thread_start(s->profiler.userdata, w->id); | |
job->callback(s, job->data); | |
if (s->profiler.thread_stop) | |
s->profiler.thread_stop(s->profiler.userdata, w->id); | |
__sync_sub_and_fetch(job->run_count, 1); | |
}} | |
} | |
} | |
static void* | |
thread_proc(void *arg) | |
{ | |
struct scheduler_worker *w = (struct scheduler_worker*)arg; | |
struct scheduler_fiber *fiber; | |
struct scheduler *s = w->sched; | |
__sync_add_and_fetch(&s->worker_running, 1); | |
/* create dummy fiber */ | |
fiber = scheduler_get_free_fiber(s); | |
assert(fiber); | |
getcontext(&fiber->handle.fib); | |
fiber->handle.fib.uc_link = 0; | |
fiber->handle.fib.uc_stack.ss_size = 0; | |
fiber->handle.fib.uc_stack.ss_sp = 0; | |
w->context = fiber; | |
fiber_proc(w); | |
return 0; | |
} | |
static void | |
scheduler_wait_for(struct scheduler *s, job_counter *counter, u32 value) | |
{ | |
struct scheduler_worker *w; | |
struct scheduler_fiber *old; | |
assert(s); | |
assert(counter); | |
/* find threads own worker state */ | |
{int worker_index = 0; | |
pthread_t self = pthread_self(); | |
for (worker_index; worker_index < s->worker_count; ++worker_index) { | |
if (s->worker[worker_index].thread == self) | |
w = &s->worker[worker_index]; | |
}} | |
assert(w); | |
/* insert current context into waiting list */ | |
old = w->context; | |
w->context->value = value; | |
w->context->job.run_count = counter; | |
scheduler_hook_into_list(&s->wait_list, old, &s->wait_list_lock); | |
/*either continue finished waiting job or start new one */ | |
w->context = scheduler_find_fiber_finished_waiting(s); | |
if (!w->context) { | |
w->context = scheduler_get_free_fiber(s); | |
assert(w->context); | |
scheduler_unhook_from_list(&s->free_list, w->context, &s->free_list_lock); | |
fiber_create(&w->context->handle, w->context->stack, w->context->stack_size, fiber_proc, w); | |
} else scheduler_unhook_from_list(&s->wait_list, w->context, &s->wait_list_lock); | |
if (s->profiler.context_switch) | |
s->profiler.context_switch(s->profiler.userdata, w->id); | |
fiber_switch_to(&old->handle, &w->context->handle); | |
} | |
static void | |
sched_init(struct scheduler *sched, size_t worker_count) | |
{ | |
size_t thread_index = 0; | |
pthread_attr_t attr; | |
assert(sched); | |
assert(worker_count); | |
memset(sched, 0, sizeof(*sched)); | |
sched->worker_count = (int)worker_count; | |
/* init semeaphores */ | |
sem_init(&sched->work_sem, 0); | |
job_queue_init(&sched->queue[0]); | |
job_queue_init(&sched->queue[1]); | |
job_queue_init(&sched->queue[2]); | |
/* init fibers */ | |
{int fiber_index = 0; | |
for (fiber_index; fiber_index < MAX_SCHEDULER_FIBERS; ++fiber_index) { | |
struct scheduler_fiber *fiber = sched->fibers + fiber_index; | |
fiber->stack_size = FIBER_STACK_SIZE; | |
fiber->stack = calloc(fiber->stack_size, 1); | |
}} | |
/* start worker threads */ | |
pthread_attr_init(&attr); | |
for (thread_index; thread_index < worker_count-1; ++thread_index) { | |
cpu_set_t cpus; | |
sched->worker[thread_index].id = (int)thread_index; | |
sched->worker[thread_index].sched = sched; | |
/* bind thread to core */ | |
CPU_ZERO(&cpus); | |
CPU_SET(thread_index, &cpus); | |
pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &cpus); | |
/* start worker thread */ | |
pthread_create(&sched->worker[thread_index].thread, &attr, thread_proc, &sched->worker[thread_index]); | |
pthread_detach(sched->worker[thread_index].thread); | |
} | |
/* initialize main thread as worker thread */ | |
{cpu_set_t cpus; | |
CPU_ZERO(&cpus); | |
CPU_SET(thread_index, &cpus); | |
sched->worker[thread_index].sched = sched; | |
sched->worker[thread_index].thread = pthread_self(); | |
sched->worker[thread_index].id = (int)thread_index; | |
pthread_setaffinity_np(sched->worker[thread_index].thread, sizeof(cpu_set_t), &cpus);} | |
/* create fiber for main thread worker */ | |
{struct scheduler_fiber *fiber; | |
fiber = scheduler_get_free_fiber(sched); | |
assert(fiber); | |
getcontext(&fiber->handle.fib); | |
fiber->handle.fib.uc_link = 0; | |
fiber->handle.fib.uc_stack.ss_size = 0; | |
fiber->handle.fib.uc_stack.ss_sp = 0; | |
sched->worker[thread_index].context = fiber;} | |
pthread_attr_destroy(&attr); | |
} | |
static void | |
sched_free(struct scheduler *sched) | |
{ | |
/* free fibers stack */ | |
{int fiber_index = 0; | |
for (fiber_index; fiber_index < MAX_SCHEDULER_FIBERS; ++fiber_index) { | |
struct scheduler_fiber *fiber = sched->fibers + fiber_index; | |
free(fiber->stack); | |
}} | |
sem_free(&sched->work_sem); | |
} | |
/* -------------------------------------------------------------- | |
* | |
* TEST | |
* | |
* --------------------------------------------------------------*/ | |
struct game_state { | |
int data; | |
}; | |
struct test_data { | |
int *data; | |
int from; | |
int to; | |
}; | |
JOB_ENTRY_POINT(test_work) | |
{ | |
int i; | |
struct test_data *data = arg; | |
for (i = data->from; i < data->to; ++i) | |
data->data[i] = i; | |
sleep(1); | |
} | |
JOB_ENTRY_POINT(root) | |
{ | |
struct game_state *game = arg; | |
job_counter counter = 0; | |
struct job jobs[8]; | |
struct test_data data[8]; | |
int i, n[2*1024]; | |
printf("root\n"); | |
for (i = 0; i < 8; ++i) { | |
data[i].data = n; | |
data[i].from = i * 256; | |
data[i].to = (i+1)*256; | |
jobs[i] = Job(test_work, &data); | |
} | |
scheduler_run(sched, JOB_QUEUE_HIGH, jobs, 4, &counter); | |
printf("run\n"); | |
scheduler_wait_for(sched, &counter, 0); | |
printf("done\n"); | |
} | |
int main(int argc, char **argv) | |
{ | |
struct game_state app; | |
struct job job = Job(root, &app); | |
job_counter counter; | |
/* start root process */ | |
struct scheduler sched; | |
size_t thread_count = (size_t)sysconf(_SC_NPROCESSORS_ONLN); | |
sched_init(&sched, thread_count); | |
printf("init\n"); | |
scheduler_run(&sched, JOB_QUEUE_HIGH, &job, 1, &counter); | |
printf("run\n"); | |
scheduler_wait_for(&sched, &counter, 0); | |
printf("finished\n"); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment