Last active
September 27, 2019 03:43
-
-
Save kprotty/110561efe113c975f1d33f9f365b564a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <sched.h> | |
#include <stdlib.h> | |
#include <stdarg.h> | |
#include <stdbool.h> | |
#include <stdatomic.h> | |
#include <sys/syscall.h> | |
#include <linux/futex.h> | |
///////////////////////////////////////////////////////////////////// | |
// Global Queue Mutex Lock/Unlock | |
///////////////////////////////////////////////////////////////////// | |
#define MUTEX_LOCKED 0 | |
#define MUTEX_UNLOCKED 1 | |
#define MUTEX_SLEEPING 2 | |
#define ACTIVE_SPIN 4 | |
#define PASSIVE_SPIN 1 | |
#define CPU_SPIN_COUNT 30 | |
#define OsYield() sched_yield() | |
#define CpuYield() asm volatile("pause" ::: "memory") | |
#define FutexWake(ptr, num_threads) \ | |
syscall(SYS_futex, ptr, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, num_threads) | |
#define FutexWait(ptr, expected_value) \ | |
syscall(SYS_futex, ptr, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, expected_value, NULL) | |
void GlobalQueueUnlock() { | |
// unlock the mutex and wake up anyone that was blocked | |
atomic_thread_fence(memory_order_release); | |
switch (atomic_exchange_explicit(&global_queue.mutex, MUTEX_UNLOCKED, memory_order_relaxed)) { | |
case MUTEX_UNLOCKED: | |
Panic("Invalid global mutex state"); | |
case MUTEX_SLEEPING: | |
FutexWake(&global_queue.mutex, 1); | |
} | |
} | |
void GlobalQueueLock() { | |
// try and speculatively grab the lock | |
int value = atomic_exchange_explicit(&global_queue.mutex, MUTEX_LOCKED, memory_order_relaxed); | |
if (value != MUTEX_UNLOCKED) { | |
acquire_loop: | |
while (true) { | |
// try and grab the lock, yield the cpu on failure | |
for (int i = 0; i < ACTIVE_SPIN; i++) { | |
int cmp = atomic_load_explicit(&global_queue.mutex, memory_order_relaxed); | |
while (cmp == MUTEX_UNLOCKED) | |
if (atomic_compare_exchange_weak_explicit(&global_queue.mutex, &cmp, value, memory_order_acquire, memory_order_relaxed)) | |
break acquire_loop; | |
for (int i = 0; i < CPU_SPIN_COUNT; i++) | |
CpuYield(); | |
} | |
// try and grab the lock, yield the os thread on failure | |
for (int i = 0; i < PASSIVE_SPIN; i++) { | |
int cmp = atomic_load_explicit(&global_queue.mutex, memory_order_relaxed); | |
while (cmp == MUTEX_UNLOCKED) | |
if (atomic_compare_exchange_weak_explicit(&global_queue.mutex, &cmp, value, memory_order_acquire, memory_order_relaxed)) | |
break acquire_loop; | |
OsYield(); | |
} | |
// high contention, try and sleep until waken up through GlobalQueueUnlock() | |
value = atomic_exchange_explicit(&global_queue.mutex, MUTEX_SLEEPING, memory_order_relaxed); | |
if (value == MUTEX_UNLOCKED) | |
break acquire_loop; | |
value = MUTEX_SLEEPING; | |
FutexWait(&global_queue.mutex, MUTEX_SLEEPING); | |
} | |
} | |
// synchronize any memory writes from existing lock holders before continuing | |
atomic_thread_fence(memory_order_acquire); | |
} | |
///////////////////////////////////////////////////////////////////// | |
// Global Queue Push / Pop | |
///////////////////////////////////////////////////////////////////// | |
void GloablQueuePush(Task* head, Task* tail, size_t size) { | |
// global queue submits batches of premade linked lists at a time | |
GlobalQueueLock(); | |
if (global_queue.tail != NULL) { | |
global_queue.tail->next = head; | |
global_queue.tail = tail; | |
} else { | |
global_queue.head = head; | |
global_queue.tail = tail; | |
} | |
const queue_size = atomic_load_explicit(&global_queue.size, memory_order_relaxed); | |
atomic_store_explicit(&global_queue.size, queue_size + size, memory_order_relaxed); | |
GlobalQueueUnlock(); | |
} | |
void WorkerQueuePush(Worker* self, Task* task_list, size_t size); | |
Task* GlobalQueuePop(Worker* worker) { | |
// exit early if the global queue is empty | |
GlobalQueueLock(); | |
const size_t queue_size = atomic_load_explicit(&global_queue.size, memory_order_relaxed); | |
if (queue_size == 0) { | |
GlobalQueueUnlock(); | |
return NULL; | |
} | |
// grab a batch of tasks from the global queue proportional to the number of workers. | |
// this makes sure that contended workers will at least hopefully see some work after a GloablQueuePop() | |
size_t batch_size = (queue_size / NUM_WORKERS) + 1; | |
if (batch_size > NUM_TASKS) | |
batch_size = NUM_TASKS; | |
Task* task_list = global_queue.head; | |
for (int i = 0; i < batch_size; i++) | |
global_queue.head = global_queue.head->next; | |
if (global_queue.head == NULL) | |
global_queue.tail = NULL; | |
atomic_store_explicit(&global_queue.size, queue_size - batch_size, memory_order_relaxed); | |
GlobalQueueUnlock(); | |
// push the acquired batch of tasks to the worker's local queue | |
WorkerQueuePush(worker, task_list, batch_size); | |
} | |
///////////////////////////////////////////////////////////////////// | |
// Worker Queue Push/Pop/Steal | |
///////////////////////////////////////////////////////////////////// | |
#define STEAL_ATTEMPTS 4 // number of times to try and steal from other workers | |
#define CO_PRIME ((NUM_TASKS / 2) + 1) // fastest way to find it when NUM_TASKS is a power of two | |
// generate a random number fast using the workers xorshift state | |
unsigned int WorkerNextRandomNumber(Worker* self) { | |
self->xorshift_x ^= self->xorshift_x << 16; | |
self->xorshift_x ^= self->xorshift_x << 5; | |
self->xorshift_x ^= self->xorshift_x << 1; | |
unsigned int temp = self->xorshift_x; | |
self->xorshift_x = self->xorshift_y; | |
self->xorshift_y = self->xorshift_z; | |
self->xorshift_z = temp ^ self->xorshift_x ^ self->xorshift_z; | |
return self->xorshift_z; | |
} | |
void WorkerQueuePush(Worker* self, Task* task_list, size_t size) { | |
// classic wait-free ring buffer implementation: | |
// load tail with acquire to receive changes by consumers | |
// store head with release to publish changes to consumers | |
const size_t head = atomic_load_explicit(&self->head, memory_order_relaxed); | |
const size_t tail = atomic_load_explicit(&self->tail, memory_order_acquire); | |
if (head != tail) | |
Panic("Only allowed to push to the worker queue when its empty"); | |
for (int i = 0; i < size; i++) { | |
self->tasks[(head + i) % NUM_TASKS] = task_list; | |
task_list = task_list->next; | |
} | |
atomic_store_explicit(&self->head, head + size, memory_order_release); | |
} | |
Task* WorkerQueuePop(Worker* self) { | |
// classic wait-free ring buffer implementation: | |
// load head with acquire to receive changes by producers | |
// store tail with release to publish changes to producers | |
const size_t tail = atomic_load_explicit(&self->tail, memory_order_relaxed); | |
const size_t head = atomic_load_explicit(&self->head, memory_order_acquire); | |
if (head == tail) | |
return NULL; | |
Task* task = self->tasks[tail]; | |
atomic_store_explicit(&self->tail, tail + 1, memory_order_release); | |
return task; | |
} | |
Task* WorkerQueueSteal(Worker* self, Worker* victim) { | |
while (true) { | |
// synchronize with both the victims consumers (potentially all workers) and producers (itself) | |
size_t victim_head = atomic_load_explicit(&victim->head, memory_order_acquire); | |
size_t victim_tail = atomic_load_explicit(&victim->tail, memory_order_acquire); | |
// try and steal half of the victims tasks in one foul swoop | |
size_t steal_size = victim_tail - victim_head; | |
steal_size = steal_size - (steal_size / 2); | |
if (steal_size == 0) | |
return NULL; | |
// synchronize with other consumer stealers (see WorkerQueuePop) | |
// then try and steal the batch by commiting to the victim first | |
size_t tail = atomic_load_explicit(&self->tail, memory_order_relaxed); | |
size_t head = atomic_load_explicit(&self->head, memory_order_acquire); | |
for (int i = 0; i < steal_size; i++) | |
self->tasks[(tail + i) % NUM_TASKS] = victim->tasks[(victim_head + i) % NUM_TASKS]; | |
if (!atomic_compare_exchange_weak_explicit(&victim->head, &victim_head, victim_head + steal_size, memory_order_release, memory_order_relaxed)) | |
continue; | |
// successfully stole from the victim, commit the changes locally | |
Task* task = self->tasks[tail]; | |
atomic_store_explicit(&self->tail, tail + steal_size + 1, memory_order_release); | |
return task; | |
} | |
} | |
Task* WorkerGetTask(Worker* self) { | |
// try and pop from the local queue first | |
Task* task; | |
if ((task = WorkerQueuePop(self)) != NULL) | |
return task; | |
while (true) { | |
// local queue is empty, try and pop from global queue instead | |
if ((task = GlobalQueuePop(self)) != NULL) | |
return task; | |
// local and global queues are empty, try stealing a batch from other workers | |
// try and balance out the workload by stealing from workers in a random order | |
for (int i = 0; i < STEAL_ATTEMPTS; i++) { | |
const unsigned int offset = WorkerNextRandomNumber(self); | |
for (unsigned int index = 0; index < NUM_WORKERS; index++) { | |
Worker* victim = &workers[((index * CO_PRIME) + offset) % NUM_WORKERS]; | |
if (victim == self) // cant really steal from urself | |
continue; | |
if ((task = WorkerQueueSteal(self, victim)) != NULL) | |
return task; | |
} | |
} | |
// no other workers have any tasks either... | |
// try and see if global queue received any tasks while we were trying to steal | |
if ((task = GlobalQueuePop(self)) != NULL) | |
return task; | |
// there truly arent any tasks, try and put the thread to sleep if possible | |
while (atomic_load_explicit(&global_queue.size, memory_order_acquire) == 0) | |
FutexWait(&global_queue.size, 0); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment