Skip to content

Instantly share code, notes, and snippets.

@kprotty
Last active September 27, 2019 03:43
Show Gist options
  • Save kprotty/110561efe113c975f1d33f9f365b564a to your computer and use it in GitHub Desktop.
Save kprotty/110561efe113c975f1d33f9f365b564a to your computer and use it in GitHub Desktop.
#include <sched.h>
#include <stdlib.h>
#include <stdarg.h>
#include <stdbool.h>
#include <stdatomic.h>
#include <sys/syscall.h>
#include <linux/futex.h>
/////////////////////////////////////////////////////////////////////
// Global Queue Mutex Lock/Unlock
/////////////////////////////////////////////////////////////////////
#define MUTEX_LOCKED 0
#define MUTEX_UNLOCKED 1
#define MUTEX_SLEEPING 2
#define ACTIVE_SPIN 4
#define PASSIVE_SPIN 1
#define CPU_SPIN_COUNT 30
#define OsYield() sched_yield()
#define CpuYield() asm volatile("pause" ::: "memory")
#define FutexWake(ptr, num_threads) \
syscall(SYS_futex, ptr, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, num_threads)
#define FutexWait(ptr, expected_value) \
syscall(SYS_futex, ptr, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, expected_value, NULL)
void GlobalQueueUnlock() {
// unlock the mutex and wake up anyone that was blocked
atomic_thread_fence(memory_order_release);
switch (atomic_exchange_explicit(&global_queue.mutex, MUTEX_UNLOCKED, memory_order_relaxed)) {
case MUTEX_UNLOCKED:
Panic("Invalid global mutex state");
case MUTEX_SLEEPING:
FutexWake(&global_queue.mutex, 1);
}
}
void GlobalQueueLock() {
// try and speculatively grab the lock
int value = atomic_exchange_explicit(&global_queue.mutex, MUTEX_LOCKED, memory_order_relaxed);
if (value != MUTEX_UNLOCKED) {
acquire_loop:
while (true) {
// try and grab the lock, yield the cpu on failure
for (int i = 0; i < ACTIVE_SPIN; i++) {
int cmp = atomic_load_explicit(&global_queue.mutex, memory_order_relaxed);
while (cmp == MUTEX_UNLOCKED)
if (atomic_compare_exchange_weak_explicit(&global_queue.mutex, &cmp, value, memory_order_acquire, memory_order_relaxed))
break acquire_loop;
for (int i = 0; i < CPU_SPIN_COUNT; i++)
CpuYield();
}
// try and grab the lock, yield the os thread on failure
for (int i = 0; i < PASSIVE_SPIN; i++) {
int cmp = atomic_load_explicit(&global_queue.mutex, memory_order_relaxed);
while (cmp == MUTEX_UNLOCKED)
if (atomic_compare_exchange_weak_explicit(&global_queue.mutex, &cmp, value, memory_order_acquire, memory_order_relaxed))
break acquire_loop;
OsYield();
}
// high contention, try and sleep until waken up through GlobalQueueUnlock()
value = atomic_exchange_explicit(&global_queue.mutex, MUTEX_SLEEPING, memory_order_relaxed);
if (value == MUTEX_UNLOCKED)
break acquire_loop;
value = MUTEX_SLEEPING;
FutexWait(&global_queue.mutex, MUTEX_SLEEPING);
}
}
// synchronize any memory writes from existing lock holders before continuing
atomic_thread_fence(memory_order_acquire);
}
/////////////////////////////////////////////////////////////////////
// Global Queue Push / Pop
/////////////////////////////////////////////////////////////////////
void GloablQueuePush(Task* head, Task* tail, size_t size) {
// global queue submits batches of premade linked lists at a time
GlobalQueueLock();
if (global_queue.tail != NULL) {
global_queue.tail->next = head;
global_queue.tail = tail;
} else {
global_queue.head = head;
global_queue.tail = tail;
}
const queue_size = atomic_load_explicit(&global_queue.size, memory_order_relaxed);
atomic_store_explicit(&global_queue.size, queue_size + size, memory_order_relaxed);
GlobalQueueUnlock();
}
void WorkerQueuePush(Worker* self, Task* task_list, size_t size);
Task* GlobalQueuePop(Worker* worker) {
// exit early if the global queue is empty
GlobalQueueLock();
const size_t queue_size = atomic_load_explicit(&global_queue.size, memory_order_relaxed);
if (queue_size == 0) {
GlobalQueueUnlock();
return NULL;
}
// grab a batch of tasks from the global queue proportional to the number of workers.
// this makes sure that contended workers will at least hopefully see some work after a GloablQueuePop()
size_t batch_size = (queue_size / NUM_WORKERS) + 1;
if (batch_size > NUM_TASKS)
batch_size = NUM_TASKS;
Task* task_list = global_queue.head;
for (int i = 0; i < batch_size; i++)
global_queue.head = global_queue.head->next;
if (global_queue.head == NULL)
global_queue.tail = NULL;
atomic_store_explicit(&global_queue.size, queue_size - batch_size, memory_order_relaxed);
GlobalQueueUnlock();
// push the acquired batch of tasks to the worker's local queue
WorkerQueuePush(worker, task_list, batch_size);
}
/////////////////////////////////////////////////////////////////////
// Worker Queue Push/Pop/Steal
/////////////////////////////////////////////////////////////////////
#define STEAL_ATTEMPTS 4 // number of times to try and steal from other workers
#define CO_PRIME ((NUM_TASKS / 2) + 1) // fastest way to find it when NUM_TASKS is a power of two
// generate a random number fast using the workers xorshift state
unsigned int WorkerNextRandomNumber(Worker* self) {
self->xorshift_x ^= self->xorshift_x << 16;
self->xorshift_x ^= self->xorshift_x << 5;
self->xorshift_x ^= self->xorshift_x << 1;
unsigned int temp = self->xorshift_x;
self->xorshift_x = self->xorshift_y;
self->xorshift_y = self->xorshift_z;
self->xorshift_z = temp ^ self->xorshift_x ^ self->xorshift_z;
return self->xorshift_z;
}
void WorkerQueuePush(Worker* self, Task* task_list, size_t size) {
// classic wait-free ring buffer implementation:
// load tail with acquire to receive changes by consumers
// store head with release to publish changes to consumers
const size_t head = atomic_load_explicit(&self->head, memory_order_relaxed);
const size_t tail = atomic_load_explicit(&self->tail, memory_order_acquire);
if (head != tail)
Panic("Only allowed to push to the worker queue when its empty");
for (int i = 0; i < size; i++) {
self->tasks[(head + i) % NUM_TASKS] = task_list;
task_list = task_list->next;
}
atomic_store_explicit(&self->head, head + size, memory_order_release);
}
Task* WorkerQueuePop(Worker* self) {
// classic wait-free ring buffer implementation:
// load head with acquire to receive changes by producers
// store tail with release to publish changes to producers
const size_t tail = atomic_load_explicit(&self->tail, memory_order_relaxed);
const size_t head = atomic_load_explicit(&self->head, memory_order_acquire);
if (head == tail)
return NULL;
Task* task = self->tasks[tail];
atomic_store_explicit(&self->tail, tail + 1, memory_order_release);
return task;
}
Task* WorkerQueueSteal(Worker* self, Worker* victim) {
while (true) {
// synchronize with both the victims consumers (potentially all workers) and producers (itself)
size_t victim_head = atomic_load_explicit(&victim->head, memory_order_acquire);
size_t victim_tail = atomic_load_explicit(&victim->tail, memory_order_acquire);
// try and steal half of the victims tasks in one foul swoop
size_t steal_size = victim_tail - victim_head;
steal_size = steal_size - (steal_size / 2);
if (steal_size == 0)
return NULL;
// synchronize with other consumer stealers (see WorkerQueuePop)
// then try and steal the batch by commiting to the victim first
size_t tail = atomic_load_explicit(&self->tail, memory_order_relaxed);
size_t head = atomic_load_explicit(&self->head, memory_order_acquire);
for (int i = 0; i < steal_size; i++)
self->tasks[(tail + i) % NUM_TASKS] = victim->tasks[(victim_head + i) % NUM_TASKS];
if (!atomic_compare_exchange_weak_explicit(&victim->head, &victim_head, victim_head + steal_size, memory_order_release, memory_order_relaxed))
continue;
// successfully stole from the victim, commit the changes locally
Task* task = self->tasks[tail];
atomic_store_explicit(&self->tail, tail + steal_size + 1, memory_order_release);
return task;
}
}
Task* WorkerGetTask(Worker* self) {
// try and pop from the local queue first
Task* task;
if ((task = WorkerQueuePop(self)) != NULL)
return task;
while (true) {
// local queue is empty, try and pop from global queue instead
if ((task = GlobalQueuePop(self)) != NULL)
return task;
// local and global queues are empty, try stealing a batch from other workers
// try and balance out the workload by stealing from workers in a random order
for (int i = 0; i < STEAL_ATTEMPTS; i++) {
const unsigned int offset = WorkerNextRandomNumber(self);
for (unsigned int index = 0; index < NUM_WORKERS; index++) {
Worker* victim = &workers[((index * CO_PRIME) + offset) % NUM_WORKERS];
if (victim == self) // cant really steal from urself
continue;
if ((task = WorkerQueueSteal(self, victim)) != NULL)
return task;
}
}
// no other workers have any tasks either...
// try and see if global queue received any tasks while we were trying to steal
if ((task = GlobalQueuePop(self)) != NULL)
return task;
// there truly arent any tasks, try and put the thread to sleep if possible
while (atomic_load_explicit(&global_queue.size, memory_order_acquire) == 0)
FutexWait(&global_queue.size, 0);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment