Created
December 17, 2020 19:25
-
-
Save lithdew/308993d92836f88d6492e49cf9af54fd to your computer and use it in GitHub Desktop.
queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pub fn Queue(comptime T: type, comptime capacity: comptime_int) type { | |
return struct { | |
const Self = @This(); | |
const Reader = struct { | |
task: pike.Task, | |
dead: bool = false, | |
}; | |
const Writer = struct { | |
next: ?*Writer = null, | |
tail: ?*Writer = null, | |
task: pike.Task, | |
dead: bool = false, | |
}; | |
lock: Mutex = .{}, | |
items: [capacity]T = undefined, | |
dead: bool = false, | |
head: usize = 0, | |
tail: usize = 0, | |
reader: ?*Reader = null, | |
writers: ?*Writer = null, | |
pub fn close(self: *Self) void { | |
if (@atomicRmw(bool, &self.dead, .Xchg, true, .Monotonic)) { | |
return; | |
} | |
const held = self.lock.acquire(); | |
const maybe_reader = blk: { | |
if (self.reader) |reader| { | |
self.reader = null; | |
break :blk reader; | |
} | |
break :blk null; | |
}; | |
var maybe_writers = blk: { | |
if (self.writers) |writers| { | |
self.writers = null; | |
break :blk writers; | |
} | |
break :blk null; | |
}; | |
held.release(); | |
if (maybe_reader) |reader| { | |
reader.dead = true; | |
pike.dispatch(&reader.task, .{}); | |
} | |
while (maybe_writers) |writer| { | |
writer.dead = true; | |
maybe_writers = writer.next; | |
pike.dispatch(&writer.task, .{}); | |
} | |
} | |
pub fn pending(self: *Self) usize { | |
const held = self.lock.acquire(); | |
defer held.release(); | |
return self.tail -% self.head; | |
} | |
pub fn push(self: *Self, item: T) !void { | |
if (@atomicLoad(bool, &self.dead, .Monotonic)) return error.AlreadyShutdown; | |
while (true) { | |
const held = self.lock.acquire(); | |
if (self.tail -% self.head < capacity) { | |
self.items[self.tail % capacity] = item; | |
self.tail +%= 1; | |
const maybe_reader = blk: { | |
if (self.reader) |reader| { | |
self.reader = null; | |
break :blk reader; | |
} | |
break :blk null; | |
}; | |
held.release(); | |
if (maybe_reader) |reader| { | |
pike.dispatch(&reader.task, .{}); | |
} | |
return; | |
} | |
var writer = Writer{ .task = pike.Task.init(@frame()) }; | |
suspend { | |
if (self.writers) |writers| { | |
writers.tail.?.next = &writer; | |
} else { | |
writer.tail = &writer; | |
self.writers = &writer; | |
} | |
held.release(); | |
} | |
if (writer.dead) return error.OperationCancelled; | |
} | |
} | |
pub fn pop(self: *Self, dst: []T) !usize { | |
if (@atomicLoad(bool, &self.dead, .Monotonic)) return error.AlreadyShutdown; | |
while (true) { | |
const held = self.lock.acquire(); | |
const count = self.tail -% self.head; | |
if (count != 0) { | |
var i: usize = 0; | |
while (i < count) : (i += 1) { | |
dst[i] = self.items[(self.head + i) % capacity]; | |
} | |
self.head = self.tail; | |
var maybe_writers = blk: { | |
if (self.writers) |writers| { | |
self.writers = null; | |
break :blk writers; | |
} | |
break :blk null; | |
}; | |
held.release(); | |
while (maybe_writers) |writer| { | |
maybe_writers = writer.next; | |
pike.dispatch(&writer.task, .{}); | |
} | |
return count; | |
} | |
var reader = Reader{ .task = pike.Task.init(@frame()) }; | |
suspend { | |
self.reader = &reader; | |
held.release(); | |
} | |
if (reader.dead) return error.OperationCancelled; | |
} | |
} | |
}; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment