Last active
January 28, 2021 10:58
-
-
Save pjako/9e8aad90b6286c1ebc37dcb94f6445bb to your computer and use it in GitHub Desktop.
Lockless mpmc queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdatomic.h> | |
#include <stdint.h> | |
#include <assert.h> | |
#include <stdbool.h> | |
typedef _Atomic(uint64_t) a64; | |
typedef _Atomic(uint32_t) a32; | |
#define atomicCompareExchange64(DESTPTR, COMPERAND, EXCHANGE) atomic_compare_exchange_weak(DESTPTR, COMPERAND, EXCHANGE) | |
#define atomicCompareExchange32(DESTPTR, COMPERAND, EXCHANGE) atomicCompareExchange64(DESTPTR, COMPERAND, EXCHANGE) | |
#define atomicLoad64(VALPTR) atomic_load(VALPTR) | |
#define atomicLoad32(VALPTR) atomic_load(VALPTR) | |
#define NUM_ELEMENTS 64 | |
#define QUEUE_MASK (NUM_ELEMENTS - 1) | |
static struct { a64 out; a64 in; a32 elements[NUM_ELEMENTS]; } queue; | |
bool addToQueue(uint32_t index) { | |
index += 1; // 0 ist reseved to track fee entries so we increase indicies by one on add and decrease again it on pull | |
uint64_t in; | |
for (;;) { | |
a64 out = atomicLoad64(&queue.out); | |
in = atomicLoad64(&queue.in); | |
uint64_t inNext = in + 1; | |
// we want to make sure that the queue is not full | |
if (inNext == out) { | |
assert(!"Queue is full!"); | |
return false; | |
} | |
if (atomicCompareExchange64(&queue.in, &in, inNext)) { | |
break; | |
} | |
} | |
a32* task = queue.elements + (in & QUEUE_MASK); | |
uint32_t expected = 0; | |
while (!atomicCompareExchange32(task, &expected, index)) { | |
// this entry still waits to get set to zero on the other end of the queue | |
// wait till it is done | |
} | |
return true; | |
} | |
enum { | |
invalidIndex = 0xFFFFFFFF, // uint32_t max value | |
}; | |
uint32_t pullFromQueue(void) { | |
a64 in; | |
uint64_t out; | |
uint64_t outNext; | |
for (;;) { | |
// load in first | |
in = atomicLoad64(&queue.in); | |
out = atomicLoad64(&queue.out); | |
outNext = out + 1; | |
if (in <= out) { | |
// return if there is no task | |
return invalidIndex; | |
} | |
if (atomicCompareExchange64(&queue.out, &out, outNext)) { | |
break; | |
} | |
} | |
uint64_t relIdx = out & QUEUE_MASK; | |
a32* task = queue.elements + relIdx; | |
uint32_t index; | |
for (;;) { | |
index = atomicLoad32(task); | |
if (index == 0) { | |
continue; | |
} | |
if (atomicCompareExchange32(task, &index, 0)) { | |
break; | |
} | |
// this queue entry is still beeing added so we wait till its done | |
// this is a lock mechanic in this otherwise lockless implementation | |
} | |
return index - 1; // 0 - 1 == invalidIndex (its a defined behaviour for unsigned ints) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment