Skip to content

Instantly share code, notes, and snippets.

@lithdew
Last active November 21, 2020 15:02
Show Gist options
  • Save lithdew/73d8163f49895046934c89c526a6df65 to your computer and use it in GitHub Desktop.
Save lithdew/73d8163f49895046934c89c526a6df65 to your computer and use it in GitHub Desktop.
atomic write buffer (zig)
pub fn Buffer(comptime T: type, comptime capacity: comptime_int) type {
return struct {
items: [capacity]T = undefined,
reader: Event = .{},
writer: Event = .{},
dead: bool = false,
head: usize = 0,
tail: usize = 0,
const Self = @This();
pub fn push(self: *Self, item: T) !void {
while (true) {
if (@atomicLoad(bool, &self.dead, .Monotonic)) {
return error.OperationCancelled;
}
const head = @atomicLoad(usize, &self.head, .Acquire);
if (self.tail -% head < capacity) {
self.items[self.tail % capacity] = item;
@atomicStore(usize, &self.tail, self.tail +% 1, .Release);
self.reader.notify();
return;
}
self.writer.wait();
}
}
pub fn pop(self: *Self, dst: []T) !usize {
while (true) {
const tail = @atomicLoad(usize, &self.tail, .Acquire);
const popped = tail -% self.head;
if (popped != 0) {
const item = self.items[self.head % capacity];
var i: usize = 0;
while (i < popped) : (i += 1) {
dst[i] = self.items[(self.head + i) % capacity];
}
@atomicStore(usize, &self.head, tail, .Release);
self.writer.notify();
return popped;
}
if (@atomicLoad(bool, &self.dead, .Acquire)) {
return error.OperationCancelled;
}
self.reader.wait();
}
}
pub fn close(self: *Self) void {
if (@atomicRmw(usize, &self.dead, .Xchg, true, .Acquire)) {
return;
}
self.reader.notify();
self.writer.notify();
}
};
}
pub const Event = struct {
state: ?*pike.Task = null,
var notified: pike.Task = undefined;
fn wait(self: *Event) void {
var task = pike.Task.init(@frame());
suspend {
var state = @atomicLoad(?*pike.Task, &self.state, .Monotonic);
while (true) {
const new_state = if (state == &notified) null else if (state == null) &task else unreachable;
state = @cmpxchgWeak(
?*pike.Task,
&self.state,
state,
new_state,
.Release,
.Monotonic,
) orelse {
if (new_state == null) pike.dispatch(&task, .{});
break;
};
}
}
}
fn notify(self: *Event) void {
var state = @atomicLoad(?*pike.Task, &self.state, .Monotonic);
while (true) {
if (state == &notified)
return;
const new_state = if (state == null) &notified else null;
state = @cmpxchgWeak(
?*pike.Task,
&self.state,
state,
new_state,
.Acquire,
.Monotonic,
) orelse {
if (state) |task| pike.dispatch(task, .{});
break;
};
}
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment