Created
January 23, 2021 23:17
-
-
Save lithdew/156a822951a01e8f2da5847cb12f8a8a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const Pool = @import("Pool.zig"); | |
const List = @import("DoublyLinkedList.zig"); | |
const mem = std.mem; | |
const testing = std.testing; | |
const Group = @This(); | |
const Child = struct { | |
stack: []u8, | |
entry: List.Node = .{}, | |
}; | |
const Frame = struct { | |
frame: anyframe, | |
runnable: Pool.Runnable = .{ .runFn = runFn }, | |
fn yield(pool: *Pool) void { | |
var frame = Frame{ .frame = @frame() }; | |
suspend pool.schedule(&frame.runnable, .{}); | |
} | |
fn runFn(runnable: *Pool.Runnable) void { | |
const self = @fieldParentPtr(Frame, "runnable", runnable); | |
resume self.frame; | |
} | |
}; | |
pool: *Pool, | |
lock: std.Thread.Mutex = .{}, | |
free_list: List = .{}, | |
live_count: usize = 0, | |
waiter: ?*Frame = null, | |
pub fn init(pool: *Pool) Group { | |
return Group{ .pool = pool }; | |
} | |
pub fn deinit(self: *Group, allocator: *mem.Allocator) void { | |
self.wait(); | |
self.purge(allocator); | |
} | |
pub fn wait(self: *Group) void { | |
const held = self.lock.acquire(); | |
if (self.live_count == 0) { | |
held.release(); | |
} else { | |
suspend { | |
self.waiter = &Frame{ .frame = @frame() }; | |
held.release(); | |
} | |
} | |
} | |
pub fn purge(self: *Group, allocator: *mem.Allocator) void { | |
const held = self.lock.acquire(); | |
defer held.release(); | |
while (self.free_list.pop()) |entry| { | |
const child = @fieldParentPtr(Child, "entry", entry); | |
allocator.free(child.stack); | |
} | |
} | |
pub fn run( | |
self: *Group, | |
allocator: *mem.Allocator, | |
comptime function: anytype, | |
function_args: anytype, | |
) mem.Allocator.Error!void { | |
const Args = @TypeOf(function_args); | |
const Closure = struct { | |
child: Child, | |
parent: *Group, | |
params: Args, | |
frame: @Frame(run), | |
fn run(closure: *@This()) void { | |
Frame.yield(closure.parent.pool); | |
const result = @call(.{}, function, closure.params); | |
const held = closure.parent.lock.acquire(); | |
defer held.release(); | |
closure.parent.free_list.append(&closure.child.entry); | |
closure.parent.live_count -= 1; | |
if (closure.parent.live_count == 0) { | |
if (closure.parent.waiter) |frame| { | |
closure.parent.waiter = null; | |
closure.parent.pool.schedule(&frame.runnable, .{}); | |
} | |
} | |
} | |
}; | |
self.purge(allocator); | |
const closure = try allocator.create(Closure); | |
errdefer allocator.destroy(closure); | |
closure.child = .{ .stack = mem.asBytes(closure) }; | |
closure.parent = self; | |
closure.params = function_args; | |
{ | |
const held = self.lock.acquire(); | |
defer held.release(); | |
self.live_count += 1; | |
} | |
closure.frame = async closure.run(); | |
} | |
fn runA() void { | |
std.debug.print("hello from A!\n", .{}); | |
} | |
fn runB() void { | |
std.debug.print("hello from B!\n", .{}); | |
} | |
fn runC() void { | |
std.debug.print("hello from C!\n", .{}); | |
} | |
pub fn main() !void { | |
// var gpa: std.heap.GeneralPurposeAllocator(.{}) = .{}; | |
// defer if (gpa.deinit()) unreachable; | |
// const allocator = &gpa.allocator; | |
const allocator = std.heap.c_allocator; | |
var pool = Pool.init(.{}); | |
var group = Group.init(&pool); | |
while (true) { | |
try group.run(allocator, runA, .{}); | |
try group.run(allocator, runB, .{}); | |
try group.run(allocator, runC, .{}); | |
} | |
var frame = async group.deinit(allocator); | |
pool.deinit(); | |
nosuspend await frame; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment