Created
February 21, 2021 10:23
-
-
Save lithdew/6720ef41f679132f17026b68b52e124e to your computer and use it in GitHub Desktop.
zig: disruptor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const mem = std.mem; | |
const math = std.math; | |
const VACANT = math.maxInt(usize); | |
pub fn RingBuffer(comptime T: type, comptime num_readers: comptime_int, comptime capacity: comptime_int) type { | |
return struct { | |
const Self = @This(); | |
slowest_reader: usize, | |
max_read_cursor: usize, | |
write_cursor: usize, | |
reader_cursors: [num_readers]usize, | |
buffer: [capacity]T, | |
pub fn init() Self { | |
var reader_cursors: [num_readers]usize = undefined; | |
mem.set(usize, &reader_cursors, VACANT); | |
return Self{ | |
.slowest_reader = 0, | |
.max_read_cursor = 0, | |
.write_cursor = 0, | |
.reader_cursors = reader_cursors, | |
.buffer = undefined, | |
}; | |
} | |
pub fn register(self: *Self, id: *usize) callconv(.Inline) usize { | |
var vacant: usize = VACANT; | |
while (true) { | |
var n: usize = 0; | |
while (n < self.reader_cursors.len) : (n += 1) { | |
_ = @cmpxchgWeak( | |
usize, | |
&self.reader_cursors[n], | |
vacant, | |
@atomicLoad(usize, &self.slowest_reader, .Acquire), | |
.Release, | |
.Monotonic, | |
) orelse { | |
id.* = n; | |
if (self.reader_cursors[id.*] == 0) { | |
@atomicStore(usize, &self.reader_cursors[id.*], 1, .Release); | |
return 1; | |
} | |
return self.reader_cursors[id.*]; | |
}; | |
} | |
} | |
} | |
pub fn deregister(self: *Self, id: usize) callconv(.Inline) void { | |
@atomicStore(usize, &self.reader_cursors[id], VACANT, .Release); | |
} | |
pub fn waitForBlocking(self: *Self, cursor: *usize) callconv(.Inline) void { | |
var incur: usize = cursor.*; | |
while (incur > @atomicLoad(usize, &self.max_read_cursor, .Monotonic)) : (std.Thread.spinLoopHint()) {} | |
cursor.* = @atomicLoad(usize, &self.max_read_cursor, .Acquire); | |
} | |
pub fn releaseEntry(self: *Self, id: usize, cursor: usize) callconv(.Inline) void { | |
@atomicStore(usize, &self.reader_cursors[id], cursor, .Monotonic); | |
} | |
pub fn nextPublisherEntry(self: *Self) callconv(.Inline) usize { | |
const incur: usize = 1 + @atomicRmw(usize, &self.write_cursor, .Add, 1, .Monotonic); | |
while (true) : (std.Thread.spinLoopHint()) { | |
var slowest_reader: usize = VACANT; | |
var n: usize = 0; | |
while (n < self.reader_cursors.len) : (n += 1) { | |
const seq = @atomicLoad(usize, &self.reader_cursors[n], .Monotonic); | |
if (seq < slowest_reader) slowest_reader = seq; | |
} | |
if (slowest_reader == VACANT) { | |
@setCold(true); | |
slowest_reader = incur - (incur & (capacity - 1)); | |
} | |
@atomicStore(usize, &self.slowest_reader, slowest_reader, .Monotonic); | |
if (incur - slowest_reader < capacity) { | |
return incur; | |
} | |
} | |
} | |
pub fn commitPublisherEntry(self: *Self, cursor: usize) callconv(.Inline) void { | |
const required_read_sequence: usize = cursor - 1; | |
while (@atomicLoad(usize, &self.max_read_cursor, .Monotonic) != required_read_sequence) : (std.Thread.spinLoopHint()) {} | |
_ = @atomicRmw(usize, &self.max_read_cursor, .Add, 1, .Release); | |
} | |
}; | |
} | |
const TestRingBuffer = RingBuffer(usize, 1, 2048); | |
const NUM_ITEMS = 250_000_000; | |
const NUM_PRODUCERS = 1; | |
const NUM_CONSUMERS = 1; | |
const Producer = struct { | |
id: usize, | |
buffer: *TestRingBuffer, | |
thread: *std.Thread, | |
fn run(self: *Producer) void { | |
var i: usize = 0; | |
while (i < NUM_ITEMS / NUM_PRODUCERS) : (i += 1) { | |
const cursor = self.buffer.nextPublisherEntry(); | |
defer self.buffer.commitPublisherEntry(cursor); | |
self.buffer.buffer[cursor % (2048 - 1)] = (self.id + 1) * i; | |
} | |
std.debug.print("publisher {d} done!\n", .{self.id}); | |
} | |
}; | |
fn runConsumer(buffer: *TestRingBuffer) void { | |
var id: usize = undefined; | |
var cursor = buffer.register(&id); | |
defer buffer.deregister(id); | |
var cursor_upper_limit = cursor; | |
var i: usize = 0; | |
while (i < NUM_ITEMS / NUM_CONSUMERS) { | |
buffer.waitForBlocking(&cursor_upper_limit); | |
defer buffer.releaseEntry(id, cursor_upper_limit); | |
while (cursor <= cursor_upper_limit) : (cursor += 1) { | |
i += 1; | |
} | |
cursor_upper_limit = cursor; | |
} | |
std.debug.print("consumer {d} done!\n", .{id}); | |
} | |
pub fn main() !void { | |
var buffer = TestRingBuffer.init(); | |
var timer = try std.time.Timer.start(); | |
var producers: [NUM_PRODUCERS]Producer = undefined; | |
for (producers) |*producer, id| { | |
producer.id = id; | |
producer.buffer = &buffer; | |
producer.thread = try std.Thread.spawn(producer, Producer.run); | |
} | |
var consumers: [NUM_CONSUMERS]*std.Thread = undefined; | |
for (consumers) |*consumer| consumer.* = try std.Thread.spawn(&buffer, runConsumer); | |
for (producers) |producer| producer.thread.wait(); | |
for (consumers) |consumer| consumer.wait(); | |
std.debug.print("Done! Took {d}.\n", .{timer.read() / std.time.ns_per_ms}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment