Skip to content

Instantly share code, notes, and snippets.

@lithdew
Created February 21, 2021 10:23
Show Gist options
  • Save lithdew/6720ef41f679132f17026b68b52e124e to your computer and use it in GitHub Desktop.
Save lithdew/6720ef41f679132f17026b68b52e124e to your computer and use it in GitHub Desktop.
zig: disruptor
const std = @import("std");
const mem = std.mem;
const math = std.math;
const VACANT = math.maxInt(usize);
pub fn RingBuffer(comptime T: type, comptime num_readers: comptime_int, comptime capacity: comptime_int) type {
return struct {
const Self = @This();
slowest_reader: usize,
max_read_cursor: usize,
write_cursor: usize,
reader_cursors: [num_readers]usize,
buffer: [capacity]T,
pub fn init() Self {
var reader_cursors: [num_readers]usize = undefined;
mem.set(usize, &reader_cursors, VACANT);
return Self{
.slowest_reader = 0,
.max_read_cursor = 0,
.write_cursor = 0,
.reader_cursors = reader_cursors,
.buffer = undefined,
};
}
pub fn register(self: *Self, id: *usize) callconv(.Inline) usize {
var vacant: usize = VACANT;
while (true) {
var n: usize = 0;
while (n < self.reader_cursors.len) : (n += 1) {
_ = @cmpxchgWeak(
usize,
&self.reader_cursors[n],
vacant,
@atomicLoad(usize, &self.slowest_reader, .Acquire),
.Release,
.Monotonic,
) orelse {
id.* = n;
if (self.reader_cursors[id.*] == 0) {
@atomicStore(usize, &self.reader_cursors[id.*], 1, .Release);
return 1;
}
return self.reader_cursors[id.*];
};
}
}
}
pub fn deregister(self: *Self, id: usize) callconv(.Inline) void {
@atomicStore(usize, &self.reader_cursors[id], VACANT, .Release);
}
pub fn waitForBlocking(self: *Self, cursor: *usize) callconv(.Inline) void {
var incur: usize = cursor.*;
while (incur > @atomicLoad(usize, &self.max_read_cursor, .Monotonic)) : (std.Thread.spinLoopHint()) {}
cursor.* = @atomicLoad(usize, &self.max_read_cursor, .Acquire);
}
pub fn releaseEntry(self: *Self, id: usize, cursor: usize) callconv(.Inline) void {
@atomicStore(usize, &self.reader_cursors[id], cursor, .Monotonic);
}
pub fn nextPublisherEntry(self: *Self) callconv(.Inline) usize {
const incur: usize = 1 + @atomicRmw(usize, &self.write_cursor, .Add, 1, .Monotonic);
while (true) : (std.Thread.spinLoopHint()) {
var slowest_reader: usize = VACANT;
var n: usize = 0;
while (n < self.reader_cursors.len) : (n += 1) {
const seq = @atomicLoad(usize, &self.reader_cursors[n], .Monotonic);
if (seq < slowest_reader) slowest_reader = seq;
}
if (slowest_reader == VACANT) {
@setCold(true);
slowest_reader = incur - (incur & (capacity - 1));
}
@atomicStore(usize, &self.slowest_reader, slowest_reader, .Monotonic);
if (incur - slowest_reader < capacity) {
return incur;
}
}
}
pub fn commitPublisherEntry(self: *Self, cursor: usize) callconv(.Inline) void {
const required_read_sequence: usize = cursor - 1;
while (@atomicLoad(usize, &self.max_read_cursor, .Monotonic) != required_read_sequence) : (std.Thread.spinLoopHint()) {}
_ = @atomicRmw(usize, &self.max_read_cursor, .Add, 1, .Release);
}
};
}
const TestRingBuffer = RingBuffer(usize, 1, 2048);
const NUM_ITEMS = 250_000_000;
const NUM_PRODUCERS = 1;
const NUM_CONSUMERS = 1;
const Producer = struct {
id: usize,
buffer: *TestRingBuffer,
thread: *std.Thread,
fn run(self: *Producer) void {
var i: usize = 0;
while (i < NUM_ITEMS / NUM_PRODUCERS) : (i += 1) {
const cursor = self.buffer.nextPublisherEntry();
defer self.buffer.commitPublisherEntry(cursor);
self.buffer.buffer[cursor % (2048 - 1)] = (self.id + 1) * i;
}
std.debug.print("publisher {d} done!\n", .{self.id});
}
};
fn runConsumer(buffer: *TestRingBuffer) void {
var id: usize = undefined;
var cursor = buffer.register(&id);
defer buffer.deregister(id);
var cursor_upper_limit = cursor;
var i: usize = 0;
while (i < NUM_ITEMS / NUM_CONSUMERS) {
buffer.waitForBlocking(&cursor_upper_limit);
defer buffer.releaseEntry(id, cursor_upper_limit);
while (cursor <= cursor_upper_limit) : (cursor += 1) {
i += 1;
}
cursor_upper_limit = cursor;
}
std.debug.print("consumer {d} done!\n", .{id});
}
pub fn main() !void {
var buffer = TestRingBuffer.init();
var timer = try std.time.Timer.start();
var producers: [NUM_PRODUCERS]Producer = undefined;
for (producers) |*producer, id| {
producer.id = id;
producer.buffer = &buffer;
producer.thread = try std.Thread.spawn(producer, Producer.run);
}
var consumers: [NUM_CONSUMERS]*std.Thread = undefined;
for (consumers) |*consumer| consumer.* = try std.Thread.spawn(&buffer, runConsumer);
for (producers) |producer| producer.thread.wait();
for (consumers) |consumer| consumer.wait();
std.debug.print("Done! Took {d}.\n", .{timer.read() / std.time.ns_per_ms});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment