Last active
August 1, 2022 07:26
-
-
Save jserv/f810c45ad4423f406f9e0dbe9dabadc9 to your computer and use it in GitHub Desktop.
Lock-free multiple-producer (MP) /multiple-consumer (MC) ring buffer (**incomplete**)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
// Parameters for smp_fence() | |
#define LoadStore 0x12 | |
#define StoreLoad 0x21 | |
#if defined(__x86_64__) | |
static inline void smp_fence(unsigned int mask) | |
{ | |
if ((mask & StoreLoad) == StoreLoad) { | |
__asm__ volatile("mfence" ::: "memory"); | |
} else if (mask != 0) { | |
/* Any fence but StoreLoad */ | |
__asm__ volatile("" ::: "memory"); | |
} | |
} | |
#else | |
#error "Unsupported architecture" | |
#endif | |
#include "common.h" | |
#if defined(__x86_64__) | |
union u128 { | |
struct { | |
uint64_t lo, hi; | |
} s; | |
__int128 ui; | |
}; | |
static inline bool lf_compare_exchange(register __int128 *var, | |
__int128 *exp, | |
__int128 neu) | |
{ | |
union u128 cmp = {.ui = *exp}, with = {.ui = neu}; | |
bool ret; | |
__asm__ __volatile__("lock cmpxchg16b %1\n\tsetz %0" | |
: "=q"(ret), "+m"(*var), "+d"(cmp.s.hi), "+a"(cmp.s.lo) | |
: "c"(with.s.hi), "b"(with.s.lo) | |
: "cc", "memory"); | |
if (UNLIKELY(!ret)) | |
*exp = cmp.ui; | |
return ret; | |
} | |
#else | |
#error "Unsupported architecture" | |
#endif |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#define CACHE_LINE 64 /* FIXME: should be configurable */ | |
#define INIT_FUNCTION __attribute__((constructor)) | |
#define LIKELY(x) __builtin_expect(!!(x), 1) | |
#define UNLIKELY(x) __builtin_expect(!!(x), 0) | |
#define ALIGNED(x) __attribute__((__aligned__(x))) | |
#if __STDC_VERSION__ >= 201112L | |
#define THREAD_LOCAL _Thread_local /* C11 */ | |
#else | |
#define THREAD_LOCAL __thread /* GNU extension */ | |
#endif | |
#define ROUNDUP_POW2(x) \ | |
({ \ | |
unsigned long _x = (x); \ | |
_x > 1 ? (1UL << (__SIZEOF_LONG__ * __CHAR_BIT__ - \ | |
__builtin_clzl(_x - 1UL))) \ | |
: 1; \ | |
}) | |
#define ROUNDUP(a, b) \ | |
({ \ | |
__typeof__(a) tmp_a = (a); \ | |
__typeof__(b) tmp_b = (b); \ | |
((tmp_a + tmp_b - 1) / tmp_b) * tmp_b; \ | |
}) | |
#if __SIZEOF_POINTER__ == 4 | |
typedef unsigned long long ptrpair_t; /* assume 64 bits */ | |
#else /* __SIZEOF_POINTER__ == 8 */ | |
typedef __int128 ptrpair_t; | |
#endif | |
#include <stdlib.h> | |
static inline void *osal_alloc(size_t size, size_t alignment) | |
{ | |
return alignment > 1 ? aligned_alloc(alignment, ROUNDUP(size, alignment)) | |
: malloc(size); | |
} | |
static inline void osal_free(void *ptr) | |
{ | |
free(ptr); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <assert.h> | |
#include <inttypes.h> | |
#include <stdbool.h> | |
#include <stdlib.h> | |
#include "arch.h" | |
#include "common.h" | |
#include "lfring.h" | |
#define SUPPORTED_FLAGS \ | |
(LFRING_FLAG_SP | LFRING_FLAG_MP | LFRING_FLAG_SC | LFRING_FLAG_MC) | |
#define MIN(a, b) \ | |
({ \ | |
__typeof__(a) tmp_a = (a); \ | |
__typeof__(b) tmp_b = (b); \ | |
tmp_a < tmp_b ? tmp_a : tmp_b; \ | |
}) | |
typedef uintptr_t ringidx_t; | |
struct element { | |
void *ptr; | |
uintptr_t idx; | |
}; | |
struct lfring { | |
ringidx_t head; | |
ringidx_t tail ALIGNED(CACHE_LINE); | |
uint32_t mask; | |
uint32_t flags; | |
struct element ring[] ALIGNED(CACHE_LINE); | |
} ALIGNED(CACHE_LINE); | |
lfring_t *lfring_alloc(uint32_t n_elems, uint32_t flags) | |
{ | |
unsigned long ringsz = ROUNDUP_POW2(n_elems); | |
if (n_elems == 0 || ringsz == 0 || ringsz > 0x80000000) { | |
assert(0 && "invalid number of elements"); | |
return NULL; | |
} | |
if ((flags & ~SUPPORTED_FLAGS) != 0) { | |
assert(0 && "invalid flags"); | |
return NULL; | |
} | |
size_t nbytes = sizeof(lfring_t) + ringsz * sizeof(struct element); | |
lfring_t *lfr = osal_alloc(nbytes, CACHE_LINE); | |
if (!lfr) | |
return NULL; | |
lfr->head = 0, lfr->tail = 0; | |
lfr->mask = ringsz - 1; | |
lfr->flags = flags; | |
for (ringidx_t i = 0; i < ringsz; i++) { | |
lfr->ring[i].ptr = NULL; | |
lfr->ring[i].idx = i - ringsz; | |
} | |
return lfr; | |
} | |
void lfring_free(lfring_t *lfr) | |
{ | |
if (!lfr) | |
return; | |
if (lfr->head != lfr->tail) { | |
assert(0 && "ring buffer not empty"); | |
return; | |
} | |
osal_free(lfr); | |
} | |
/* True if 'a' is before 'b' ('a' < 'b') in serial number arithmetic */ | |
static inline bool before(ringidx_t a, ringidx_t b) | |
{ | |
return (intptr_t)(a - b) < 0; | |
} | |
static inline ringidx_t cond_update(ringidx_t *loc, ringidx_t neu) | |
{ | |
ringidx_t old = __atomic_load_n(loc, __ATOMIC_RELAXED); | |
do { | |
if (before(neu, old)) /* neu < old */ | |
return old; | |
/* if neu > old, need to update *loc */ | |
} while (!__atomic_compare_exchange_n(loc, &old, /* Updated on failure */ | |
neu, | |
/* weak */ true, __ATOMIC_RELEASE, | |
__ATOMIC_RELAXED)); | |
return neu; | |
} | |
static inline ringidx_t cond_reload(ringidx_t idx, const ringidx_t *loc) | |
{ | |
ringidx_t fresh = __atomic_load_n(loc, __ATOMIC_RELAXED); | |
if (before(idx, fresh)) { /* fresh is after idx, use this instead */ | |
idx = fresh; | |
} else { /* Continue with next slot */ | |
/* XXXXX */ DDD; | |
} | |
return idx; | |
} | |
/* Enqueue elements at tail */ | |
uint32_t lfring_enqueue(lfring_t *lfr, | |
void *const *restrict elems, | |
uint32_t n_elems) | |
{ | |
intptr_t actual = 0; | |
ringidx_t mask = lfr->mask; | |
ringidx_t size = mask + 1; | |
ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_RELAXED); | |
if (lfr->flags & LFRING_FLAG_SP) { /* single-producer */ | |
ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE); | |
actual = MIN((intptr_t)(head + size - tail), (intptr_t) n_elems); | |
if (actual <= 0) | |
return 0; | |
for (uint32_t i = 0; i < (uint32_t) actual; i++) { | |
assert(lfr->ring[tail & mask].idx == tail - size); | |
lfr->ring[tail & mask].ptr = *elems++; | |
lfr->ring[tail & mask].idx = tail; | |
tail++; | |
} | |
__atomic_store_n(&lfr->tail, tail, __ATOMIC_RELEASE); | |
return (uint32_t) actual; | |
} | |
/* else: lock-free multi-producer */ | |
restart: | |
while ((uint32_t) actual < n_elems && | |
before(tail, __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE) + size)) { | |
union { | |
struct element e; | |
ptrpair_t pp; | |
} old, neu; | |
void *elem = elems[actual]; | |
struct element *slot = &lfr->ring[tail & mask]; | |
old.e.ptr = __atomic_load_n(&slot->ptr, __ATOMIC_RELAXED); | |
old.e.idx = __atomic_load_n(&slot->idx, __ATOMIC_RELAXED); | |
do { | |
if (UNLIKELY(old.e.idx != tail - size)) { | |
if (old.e.idx != tail) { | |
/* We are far behind. Restart with fresh index */ | |
tail = cond_reload(tail, &lfr->tail); | |
goto restart; | |
} | |
/* slot already enqueued */ | |
tail++; /* Try next slot */ | |
goto restart; | |
} | |
/* Found slot that was used one lap back. | |
* Try to enqueue next element. | |
*/ | |
neu.e.ptr = elem; | |
neu.e.idx = tail; /* Set idx on enqueue */ | |
} while (!lf_compare_exchange((ptrpair_t *) slot, &old.pp, neu.pp)); | |
/* Enqueue succeeded */ | |
actual++; | |
tail++; /* Continue with next slot */ | |
} | |
(void) cond_update(&lfr->tail, tail); | |
return (uint32_t) actual; | |
} | |
static inline ringidx_t find_tail(lfring_t *lfr, ringidx_t head, ringidx_t tail) | |
{ | |
if (lfr->flags & LFRING_FLAG_SP) /* single-producer enqueue */ | |
return __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE); | |
/* Multi-producer enqueue. | |
* Scan ring for new elements that have been written but not released. | |
*/ | |
ringidx_t mask = lfr->mask; | |
ringidx_t size = /* XXXXX */ KKK; | |
while (before(tail, head + size) && | |
__atomic_load_n(/* XXXXX */ TTT) == | |
tail) | |
tail++; | |
tail = cond_update(&lfr->tail, tail); | |
return tail; | |
} | |
/* Dequeue elements from head */ | |
uint32_t lfring_dequeue(lfring_t *lfr, | |
void **restrict elems, | |
uint32_t n_elems, | |
uint32_t *index) | |
{ | |
ringidx_t mask = lfr->mask; | |
intptr_t actual; | |
ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_RELAXED); | |
ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE); | |
do { | |
actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems); | |
if (UNLIKELY(actual <= 0)) { | |
/* Ring buffer is empty, scan for new but unreleased elements */ | |
tail = find_tail(lfr, head, tail); | |
actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems); | |
if (actual <= 0) | |
return 0; | |
} | |
for (uint32_t i = 0; i < (uint32_t) actual; i++) | |
elems[i] = lfr->ring[(head + i) & mask].ptr; | |
smp_fence(LoadStore); // Order loads only | |
if (UNLIKELY(lfr->flags & LFRING_FLAG_SC)) { /* Single-consumer */ | |
__atomic_store_n(&lfr->head, head + actual, __ATOMIC_RELAXED); | |
break; | |
} | |
/* else: lock-free multi-consumer */ | |
} while (!__atomic_compare_exchange_n( | |
&lfr->head, &head, /* Updated on failure */ | |
/* XXXXX */ HHH, | |
/* weak */ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); | |
*index = (uint32_t) head; | |
return (uint32_t) actual; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* Lock-free multiple-producer (MP) /multiple-consumer (MC) ring buffer. */ | |
#pragma once | |
#include <stddef.h> | |
#include <stdint.h> | |
enum { | |
LFRING_FLAG_MP = 0x0000 /* Multiple producer */, | |
LFRING_FLAG_SP = 0x0001 /* Single producer */, | |
LFRING_FLAG_MC = 0x0000 /* Multi consumer */, | |
LFRING_FLAG_SC = 0x0002 /* Single consumer */, | |
}; | |
typedef struct lfring lfring_t; | |
/* Allocate ring buffer with space for at least 'n_elems' elements. | |
* 'n_elems' != 0 and 'n_elems' <= 0x80000000 | |
*/ | |
lfring_t *lfring_alloc(uint32_t n_elems, uint32_t flags); | |
/* Free ring buffer. | |
* The ring buffer must be empty | |
*/ | |
void lfring_free(lfring_t *lfr); | |
/* Enqueue elements on ring buffer. | |
* The number of actually enqueued elements is returned. | |
*/ | |
uint32_t lfring_enqueue(lfring_t *lfr, void *const elems[], uint32_t n_elems); | |
/* Dequeue elements from ring buffer. | |
* The number of actually dequeued elements is returned. | |
*/ | |
uint32_t lfring_dequeue(lfring_t *lfr, | |
void *elems[], | |
uint32_t n_elems, | |
uint32_t *index); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CC = gcc | |
CFLAGS = -O2 -g -Wall -I. | |
CFLAGS += -fsanitize=thread | |
LDFLAGS = -fsanitize=thread | |
all: lfring | |
# Control the build verbosity | |
ifeq ("$(VERBOSE)","1") | |
Q := | |
VECHO = @true | |
else | |
Q := @ | |
VECHO = @printf | |
endif | |
OBJS := lfring.o tests.o | |
deps := $(OBJS:%.o=.%.o.d) | |
lfring: $(OBJS) | |
$(VECHO) " LD\t$@\n" | |
$(Q)$(CC) $(LDFLAGS) -o $@ $^ | |
%.o: %.c | |
@mkdir -p .$(DUT_DIR) | |
$(VECHO) " CC\t$@\n" | |
$(Q)$(CC) -o $@ $(CFLAGS) -c -MMD -MF [email protected] $< | |
clean: | |
rm -f $(OBJS) $(deps) lfring | |
rm -rf *.dSYM | |
-include $(deps) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <stdlib.h> | |
#include "lfring.h" | |
#define EX_HASHSTR(s) #s | |
#define EX_STR(s) EX_HASHSTR(s) | |
#define EXPECT(exp) \ | |
{ \ | |
if (!(exp)) \ | |
fprintf(stderr, "FAILURE @ %s:%u; %s\n", __FILE__, __LINE__, \ | |
EX_STR(exp)), \ | |
abort(); \ | |
} | |
static void test_ringbuffer(uint32_t flags) | |
{ | |
void *vec[4]; | |
uint32_t ret; | |
uint32_t idx; | |
lfring_t *rb = lfring_alloc(2, flags); | |
EXPECT(rb != NULL); | |
ret = lfring_dequeue(rb, vec, 1, &idx); | |
EXPECT(ret == 0); | |
ret = lfring_enqueue(rb, (void *[]){(void *) 1}, 1); | |
EXPECT(ret == 1); | |
ret = lfring_dequeue(rb, vec, 1, &idx); | |
EXPECT(ret == 1); | |
EXPECT(idx == 0); | |
EXPECT(vec[0] == (void *) 1); | |
ret = lfring_dequeue(rb, vec, 1, &idx); | |
EXPECT(ret == 0); | |
ret = lfring_enqueue(rb, (void *[]){(void *) 2, (void *) 3, (void *) 4}, 3); | |
EXPECT(ret == 2); | |
ret = lfring_dequeue(rb, vec, 1, &idx); | |
EXPECT(ret == 1); | |
EXPECT(idx == 1); | |
EXPECT(vec[0] == (void *) 2); | |
ret = lfring_dequeue(rb, vec, 4, &idx); | |
EXPECT(ret == 1); | |
EXPECT(idx == 2); | |
EXPECT(vec[0] == (void *) 3); | |
lfring_free(rb); | |
} | |
int main(void) | |
{ | |
printf("testing MPMC lock-free ring\n"); | |
test_ringbuffer(LFRING_FLAG_MP | LFRING_FLAG_MC); | |
printf("testing MPSC lock-free ring\n"); | |
test_ringbuffer(LFRING_FLAG_MP | LFRING_FLAG_SC); | |
printf("testing SPMC lock-free ring\n"); | |
test_ringbuffer(LFRING_FLAG_SP | LFRING_FLAG_MC); | |
printf("testing SPSC lock-free ring\n"); | |
test_ringbuffer(LFRING_FLAG_SP | LFRING_FLAG_SC); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment