Created
January 22, 2021 16:21
-
-
Save lithdew/442ebf89d35ae29c815e31ebb43471d6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const mem = std.mem; | |
const testing = std.testing; | |
const Pool = @import("Pool.zig"); | |
const Counter = @import("Counter.zig"); | |
const List = @import("DoublyLinkedList.zig"); | |
const Mutex = std.Thread.Mutex; | |
const Group = @This(); | |
pub const Child = struct { | |
run_callback: fn (*Child) void, | |
cleanup_callback: ?fn (*Child) void = null, | |
group: ?*Group = null, | |
entry: List.Node = .{}, | |
runnable: Pool.Runnable = .{ .runFn = run }, | |
fn run(runnable: *Pool.Runnable) void { | |
const child = @fieldParentPtr(Group.Child, "runnable", runnable); | |
const group = child.group orelse unreachable; | |
defer { | |
const held = group.lock.acquire(); | |
defer held.release(); | |
group.node_list.remove(&child.entry); | |
group.free_list.append(&child.entry); | |
group.count -= 1; | |
if (group.count == 0) { | |
if (group.waiter) |waiter| { | |
group.pool.schedule(&waiter.runnable, .{}); | |
group.waiter = null; | |
} | |
} | |
} | |
child.run_callback(child); | |
} | |
}; | |
const Waiter = struct { | |
frame: *@Frame(Group.wait), | |
runnable: Pool.Runnable = .{ .runFn = run }, | |
fn run(runnable: *Pool.Runnable) void { | |
const self = @fieldParentPtr(Waiter, "runnable", runnable); | |
resume self.frame; | |
} | |
}; | |
pool: *Pool, | |
lock: Mutex = .{}, | |
node_list: List = .{}, | |
free_list: List = .{}, | |
count: usize = 0, | |
waiter: ?*Waiter = null, | |
pub fn init(pool: *Pool) Group { | |
return Group{ .pool = pool }; | |
} | |
pub fn deinit(self: *Group) void { | |
self.evict(); | |
} | |
pub fn wait(self: *Group) void { | |
suspend { | |
const held = self.lock.acquire(); | |
defer held.release(); | |
self.waiter = &Waiter{ .frame = @frame() }; | |
} | |
} | |
pub fn add(self: *Group, child: *Group.Child) !void { | |
if (child.group != null) { | |
unreachable; | |
} | |
child.group = self; | |
{ | |
const held = self.lock.acquire(); | |
defer held.release(); | |
self.node_list.append(&child.entry); | |
self.count += 1; | |
self.evict(); | |
} | |
self.pool.schedule(&child.runnable, .{}); | |
} | |
fn evict(self: *Group) void { | |
while (self.free_list.shift()) |entry| { | |
const evicted = @fieldParentPtr(Child, "entry", entry); | |
if (evicted.cleanup_callback) |cleanup| { | |
cleanup(evicted); | |
} | |
} | |
} | |
const A = struct { | |
child: Group.Child = .{ .run_callback = run }, | |
fn run(child: *Group.Child) void { | |
const self = @fieldParentPtr(@This(), "child", child); | |
std.debug.print("hello from A!\n", .{}); | |
} | |
}; | |
const B = struct { | |
child: Group.Child = .{ .run_callback = run }, | |
fn run(child: *Group.Child) void { | |
const self = @fieldParentPtr(@This(), "child", child); | |
std.debug.print("hello from B!\n", .{}); | |
} | |
}; | |
const C = struct { | |
child: Group.Child = .{ .run_callback = run }, | |
fn run(child: *Group.Child) void { | |
const self = @fieldParentPtr(@This(), "child", child); | |
std.debug.print("hello from C!\n", .{}); | |
} | |
}; | |
fn runTest(pool: *Pool) !void { | |
var group = Group.init(pool); | |
defer group.deinit(); | |
var a: A = .{}; | |
var b: B = .{}; | |
var c: C = .{}; | |
try group.add(&a.child); | |
try group.add(&b.child); | |
try group.add(&c.child); | |
group.wait(); | |
} | |
pub fn main() !void { | |
var pool = Pool.init(.{}); | |
var frame = async runTest(&pool); | |
pool.deinit(); | |
try nosuspend await frame; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment