Skip to content

Instantly share code, notes, and snippets.

@jserv
Last active August 1, 2022 07:26
Show Gist options
  • Save jserv/f810c45ad4423f406f9e0dbe9dabadc9 to your computer and use it in GitHub Desktop.
Save jserv/f810c45ad4423f406f9e0dbe9dabadc9 to your computer and use it in GitHub Desktop.
Lock-free multiple-producer (MP) /multiple-consumer (MC) ring buffer (**incomplete**)
#pragma once
// Parameters for smp_fence()
#define LoadStore 0x12
#define StoreLoad 0x21
#if defined(__x86_64__)
static inline void smp_fence(unsigned int mask)
{
if ((mask & StoreLoad) == StoreLoad) {
__asm__ volatile("mfence" ::: "memory");
} else if (mask != 0) {
/* Any fence but StoreLoad */
__asm__ volatile("" ::: "memory");
}
}
#else
#error "Unsupported architecture"
#endif
#include "common.h"
#if defined(__x86_64__)
union u128 {
struct {
uint64_t lo, hi;
} s;
__int128 ui;
};
static inline bool lf_compare_exchange(register __int128 *var,
__int128 *exp,
__int128 neu)
{
union u128 cmp = {.ui = *exp}, with = {.ui = neu};
bool ret;
__asm__ __volatile__("lock cmpxchg16b %1\n\tsetz %0"
: "=q"(ret), "+m"(*var), "+d"(cmp.s.hi), "+a"(cmp.s.lo)
: "c"(with.s.hi), "b"(with.s.lo)
: "cc", "memory");
if (UNLIKELY(!ret))
*exp = cmp.ui;
return ret;
}
#else
#error "Unsupported architecture"
#endif
#pragma once
#define CACHE_LINE 64 /* FIXME: should be configurable */
#define INIT_FUNCTION __attribute__((constructor))
#define LIKELY(x) __builtin_expect(!!(x), 1)
#define UNLIKELY(x) __builtin_expect(!!(x), 0)
#define ALIGNED(x) __attribute__((__aligned__(x)))
#if __STDC_VERSION__ >= 201112L
#define THREAD_LOCAL _Thread_local /* C11 */
#else
#define THREAD_LOCAL __thread /* GNU extension */
#endif
#define ROUNDUP_POW2(x) \
({ \
unsigned long _x = (x); \
_x > 1 ? (1UL << (__SIZEOF_LONG__ * __CHAR_BIT__ - \
__builtin_clzl(_x - 1UL))) \
: 1; \
})
#define ROUNDUP(a, b) \
({ \
__typeof__(a) tmp_a = (a); \
__typeof__(b) tmp_b = (b); \
((tmp_a + tmp_b - 1) / tmp_b) * tmp_b; \
})
#if __SIZEOF_POINTER__ == 4
typedef unsigned long long ptrpair_t; /* assume 64 bits */
#else /* __SIZEOF_POINTER__ == 8 */
typedef __int128 ptrpair_t;
#endif
#include <stdlib.h>
static inline void *osal_alloc(size_t size, size_t alignment)
{
return alignment > 1 ? aligned_alloc(alignment, ROUNDUP(size, alignment))
: malloc(size);
}
static inline void osal_free(void *ptr)
{
free(ptr);
}
#include <assert.h>
#include <inttypes.h>
#include <stdbool.h>
#include <stdlib.h>
#include "arch.h"
#include "common.h"
#include "lfring.h"
#define SUPPORTED_FLAGS \
(LFRING_FLAG_SP | LFRING_FLAG_MP | LFRING_FLAG_SC | LFRING_FLAG_MC)
#define MIN(a, b) \
({ \
__typeof__(a) tmp_a = (a); \
__typeof__(b) tmp_b = (b); \
tmp_a < tmp_b ? tmp_a : tmp_b; \
})
typedef uintptr_t ringidx_t;
struct element {
void *ptr;
uintptr_t idx;
};
struct lfring {
ringidx_t head;
ringidx_t tail ALIGNED(CACHE_LINE);
uint32_t mask;
uint32_t flags;
struct element ring[] ALIGNED(CACHE_LINE);
} ALIGNED(CACHE_LINE);
lfring_t *lfring_alloc(uint32_t n_elems, uint32_t flags)
{
unsigned long ringsz = ROUNDUP_POW2(n_elems);
if (n_elems == 0 || ringsz == 0 || ringsz > 0x80000000) {
assert(0 && "invalid number of elements");
return NULL;
}
if ((flags & ~SUPPORTED_FLAGS) != 0) {
assert(0 && "invalid flags");
return NULL;
}
size_t nbytes = sizeof(lfring_t) + ringsz * sizeof(struct element);
lfring_t *lfr = osal_alloc(nbytes, CACHE_LINE);
if (!lfr)
return NULL;
lfr->head = 0, lfr->tail = 0;
lfr->mask = ringsz - 1;
lfr->flags = flags;
for (ringidx_t i = 0; i < ringsz; i++) {
lfr->ring[i].ptr = NULL;
lfr->ring[i].idx = i - ringsz;
}
return lfr;
}
void lfring_free(lfring_t *lfr)
{
if (!lfr)
return;
if (lfr->head != lfr->tail) {
assert(0 && "ring buffer not empty");
return;
}
osal_free(lfr);
}
/* True if 'a' is before 'b' ('a' < 'b') in serial number arithmetic */
static inline bool before(ringidx_t a, ringidx_t b)
{
return (intptr_t)(a - b) < 0;
}
static inline ringidx_t cond_update(ringidx_t *loc, ringidx_t neu)
{
ringidx_t old = __atomic_load_n(loc, __ATOMIC_RELAXED);
do {
if (before(neu, old)) /* neu < old */
return old;
/* if neu > old, need to update *loc */
} while (!__atomic_compare_exchange_n(loc, &old, /* Updated on failure */
neu,
/* weak */ true, __ATOMIC_RELEASE,
__ATOMIC_RELAXED));
return neu;
}
static inline ringidx_t cond_reload(ringidx_t idx, const ringidx_t *loc)
{
ringidx_t fresh = __atomic_load_n(loc, __ATOMIC_RELAXED);
if (before(idx, fresh)) { /* fresh is after idx, use this instead */
idx = fresh;
} else { /* Continue with next slot */
/* XXXXX */ DDD;
}
return idx;
}
/* Enqueue elements at tail */
uint32_t lfring_enqueue(lfring_t *lfr,
void *const *restrict elems,
uint32_t n_elems)
{
intptr_t actual = 0;
ringidx_t mask = lfr->mask;
ringidx_t size = mask + 1;
ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_RELAXED);
if (lfr->flags & LFRING_FLAG_SP) { /* single-producer */
ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE);
actual = MIN((intptr_t)(head + size - tail), (intptr_t) n_elems);
if (actual <= 0)
return 0;
for (uint32_t i = 0; i < (uint32_t) actual; i++) {
assert(lfr->ring[tail & mask].idx == tail - size);
lfr->ring[tail & mask].ptr = *elems++;
lfr->ring[tail & mask].idx = tail;
tail++;
}
__atomic_store_n(&lfr->tail, tail, __ATOMIC_RELEASE);
return (uint32_t) actual;
}
/* else: lock-free multi-producer */
restart:
while ((uint32_t) actual < n_elems &&
before(tail, __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE) + size)) {
union {
struct element e;
ptrpair_t pp;
} old, neu;
void *elem = elems[actual];
struct element *slot = &lfr->ring[tail & mask];
old.e.ptr = __atomic_load_n(&slot->ptr, __ATOMIC_RELAXED);
old.e.idx = __atomic_load_n(&slot->idx, __ATOMIC_RELAXED);
do {
if (UNLIKELY(old.e.idx != tail - size)) {
if (old.e.idx != tail) {
/* We are far behind. Restart with fresh index */
tail = cond_reload(tail, &lfr->tail);
goto restart;
}
/* slot already enqueued */
tail++; /* Try next slot */
goto restart;
}
/* Found slot that was used one lap back.
* Try to enqueue next element.
*/
neu.e.ptr = elem;
neu.e.idx = tail; /* Set idx on enqueue */
} while (!lf_compare_exchange((ptrpair_t *) slot, &old.pp, neu.pp));
/* Enqueue succeeded */
actual++;
tail++; /* Continue with next slot */
}
(void) cond_update(&lfr->tail, tail);
return (uint32_t) actual;
}
static inline ringidx_t find_tail(lfring_t *lfr, ringidx_t head, ringidx_t tail)
{
if (lfr->flags & LFRING_FLAG_SP) /* single-producer enqueue */
return __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);
/* Multi-producer enqueue.
* Scan ring for new elements that have been written but not released.
*/
ringidx_t mask = lfr->mask;
ringidx_t size = /* XXXXX */ KKK;
while (before(tail, head + size) &&
__atomic_load_n(/* XXXXX */ TTT) ==
tail)
tail++;
tail = cond_update(&lfr->tail, tail);
return tail;
}
/* Dequeue elements from head */
uint32_t lfring_dequeue(lfring_t *lfr,
void **restrict elems,
uint32_t n_elems,
uint32_t *index)
{
ringidx_t mask = lfr->mask;
intptr_t actual;
ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_RELAXED);
ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);
do {
actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems);
if (UNLIKELY(actual <= 0)) {
/* Ring buffer is empty, scan for new but unreleased elements */
tail = find_tail(lfr, head, tail);
actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems);
if (actual <= 0)
return 0;
}
for (uint32_t i = 0; i < (uint32_t) actual; i++)
elems[i] = lfr->ring[(head + i) & mask].ptr;
smp_fence(LoadStore); // Order loads only
if (UNLIKELY(lfr->flags & LFRING_FLAG_SC)) { /* Single-consumer */
__atomic_store_n(&lfr->head, head + actual, __ATOMIC_RELAXED);
break;
}
/* else: lock-free multi-consumer */
} while (!__atomic_compare_exchange_n(
&lfr->head, &head, /* Updated on failure */
/* XXXXX */ HHH,
/* weak */ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
*index = (uint32_t) head;
return (uint32_t) actual;
}
/* Lock-free multiple-producer (MP) /multiple-consumer (MC) ring buffer. */
#pragma once
#include <stddef.h>
#include <stdint.h>
enum {
LFRING_FLAG_MP = 0x0000 /* Multiple producer */,
LFRING_FLAG_SP = 0x0001 /* Single producer */,
LFRING_FLAG_MC = 0x0000 /* Multi consumer */,
LFRING_FLAG_SC = 0x0002 /* Single consumer */,
};
typedef struct lfring lfring_t;
/* Allocate ring buffer with space for at least 'n_elems' elements.
* 'n_elems' != 0 and 'n_elems' <= 0x80000000
*/
lfring_t *lfring_alloc(uint32_t n_elems, uint32_t flags);
/* Free ring buffer.
* The ring buffer must be empty
*/
void lfring_free(lfring_t *lfr);
/* Enqueue elements on ring buffer.
* The number of actually enqueued elements is returned.
*/
uint32_t lfring_enqueue(lfring_t *lfr, void *const elems[], uint32_t n_elems);
/* Dequeue elements from ring buffer.
* The number of actually dequeued elements is returned.
*/
uint32_t lfring_dequeue(lfring_t *lfr,
void *elems[],
uint32_t n_elems,
uint32_t *index);
CC = gcc
CFLAGS = -O2 -g -Wall -I.
CFLAGS += -fsanitize=thread
LDFLAGS = -fsanitize=thread
all: lfring
# Control the build verbosity
ifeq ("$(VERBOSE)","1")
Q :=
VECHO = @true
else
Q := @
VECHO = @printf
endif
OBJS := lfring.o tests.o
deps := $(OBJS:%.o=.%.o.d)
lfring: $(OBJS)
$(VECHO) " LD\t$@\n"
$(Q)$(CC) $(LDFLAGS) -o $@ $^
%.o: %.c
@mkdir -p .$(DUT_DIR)
$(VECHO) " CC\t$@\n"
$(Q)$(CC) -o $@ $(CFLAGS) -c -MMD -MF [email protected] $<
clean:
rm -f $(OBJS) $(deps) lfring
rm -rf *.dSYM
-include $(deps)
#include <stdio.h>
#include <stdlib.h>
#include "lfring.h"
#define EX_HASHSTR(s) #s
#define EX_STR(s) EX_HASHSTR(s)
#define EXPECT(exp) \
{ \
if (!(exp)) \
fprintf(stderr, "FAILURE @ %s:%u; %s\n", __FILE__, __LINE__, \
EX_STR(exp)), \
abort(); \
}
static void test_ringbuffer(uint32_t flags)
{
void *vec[4];
uint32_t ret;
uint32_t idx;
lfring_t *rb = lfring_alloc(2, flags);
EXPECT(rb != NULL);
ret = lfring_dequeue(rb, vec, 1, &idx);
EXPECT(ret == 0);
ret = lfring_enqueue(rb, (void *[]){(void *) 1}, 1);
EXPECT(ret == 1);
ret = lfring_dequeue(rb, vec, 1, &idx);
EXPECT(ret == 1);
EXPECT(idx == 0);
EXPECT(vec[0] == (void *) 1);
ret = lfring_dequeue(rb, vec, 1, &idx);
EXPECT(ret == 0);
ret = lfring_enqueue(rb, (void *[]){(void *) 2, (void *) 3, (void *) 4}, 3);
EXPECT(ret == 2);
ret = lfring_dequeue(rb, vec, 1, &idx);
EXPECT(ret == 1);
EXPECT(idx == 1);
EXPECT(vec[0] == (void *) 2);
ret = lfring_dequeue(rb, vec, 4, &idx);
EXPECT(ret == 1);
EXPECT(idx == 2);
EXPECT(vec[0] == (void *) 3);
lfring_free(rb);
}
int main(void)
{
printf("testing MPMC lock-free ring\n");
test_ringbuffer(LFRING_FLAG_MP | LFRING_FLAG_MC);
printf("testing MPSC lock-free ring\n");
test_ringbuffer(LFRING_FLAG_MP | LFRING_FLAG_SC);
printf("testing SPMC lock-free ring\n");
test_ringbuffer(LFRING_FLAG_SP | LFRING_FLAG_MC);
printf("testing SPSC lock-free ring\n");
test_ringbuffer(LFRING_FLAG_SP | LFRING_FLAG_SC);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment