Skip to content

Instantly share code, notes, and snippets.

@lithdew
Created December 17, 2020 19:25
Show Gist options
  • Save lithdew/308993d92836f88d6492e49cf9af54fd to your computer and use it in GitHub Desktop.
Save lithdew/308993d92836f88d6492e49cf9af54fd to your computer and use it in GitHub Desktop.
queue
pub fn Queue(comptime T: type, comptime capacity: comptime_int) type {
return struct {
const Self = @This();
const Reader = struct {
task: pike.Task,
dead: bool = false,
};
const Writer = struct {
next: ?*Writer = null,
tail: ?*Writer = null,
task: pike.Task,
dead: bool = false,
};
lock: Mutex = .{},
items: [capacity]T = undefined,
dead: bool = false,
head: usize = 0,
tail: usize = 0,
reader: ?*Reader = null,
writers: ?*Writer = null,
pub fn close(self: *Self) void {
if (@atomicRmw(bool, &self.dead, .Xchg, true, .Monotonic)) {
return;
}
const held = self.lock.acquire();
const maybe_reader = blk: {
if (self.reader) |reader| {
self.reader = null;
break :blk reader;
}
break :blk null;
};
var maybe_writers = blk: {
if (self.writers) |writers| {
self.writers = null;
break :blk writers;
}
break :blk null;
};
held.release();
if (maybe_reader) |reader| {
reader.dead = true;
pike.dispatch(&reader.task, .{});
}
while (maybe_writers) |writer| {
writer.dead = true;
maybe_writers = writer.next;
pike.dispatch(&writer.task, .{});
}
}
pub fn pending(self: *Self) usize {
const held = self.lock.acquire();
defer held.release();
return self.tail -% self.head;
}
pub fn push(self: *Self, item: T) !void {
if (@atomicLoad(bool, &self.dead, .Monotonic)) return error.AlreadyShutdown;
while (true) {
const held = self.lock.acquire();
if (self.tail -% self.head < capacity) {
self.items[self.tail % capacity] = item;
self.tail +%= 1;
const maybe_reader = blk: {
if (self.reader) |reader| {
self.reader = null;
break :blk reader;
}
break :blk null;
};
held.release();
if (maybe_reader) |reader| {
pike.dispatch(&reader.task, .{});
}
return;
}
var writer = Writer{ .task = pike.Task.init(@frame()) };
suspend {
if (self.writers) |writers| {
writers.tail.?.next = &writer;
} else {
writer.tail = &writer;
self.writers = &writer;
}
held.release();
}
if (writer.dead) return error.OperationCancelled;
}
}
pub fn pop(self: *Self, dst: []T) !usize {
if (@atomicLoad(bool, &self.dead, .Monotonic)) return error.AlreadyShutdown;
while (true) {
const held = self.lock.acquire();
const count = self.tail -% self.head;
if (count != 0) {
var i: usize = 0;
while (i < count) : (i += 1) {
dst[i] = self.items[(self.head + i) % capacity];
}
self.head = self.tail;
var maybe_writers = blk: {
if (self.writers) |writers| {
self.writers = null;
break :blk writers;
}
break :blk null;
};
held.release();
while (maybe_writers) |writer| {
maybe_writers = writer.next;
pike.dispatch(&writer.task, .{});
}
return count;
}
var reader = Reader{ .task = pike.Task.init(@frame()) };
suspend {
self.reader = &reader;
held.release();
}
if (reader.dead) return error.OperationCancelled;
}
}
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment