Skip to content

Instantly share code, notes, and snippets.

@lithdew
Created January 22, 2021 16:21
Show Gist options
  • Save lithdew/442ebf89d35ae29c815e31ebb43471d6 to your computer and use it in GitHub Desktop.
Save lithdew/442ebf89d35ae29c815e31ebb43471d6 to your computer and use it in GitHub Desktop.
const std = @import("std");
const mem = std.mem;
const testing = std.testing;
const Pool = @import("Pool.zig");
const Counter = @import("Counter.zig");
const List = @import("DoublyLinkedList.zig");
const Mutex = std.Thread.Mutex;
const Group = @This();
pub const Child = struct {
run_callback: fn (*Child) void,
cleanup_callback: ?fn (*Child) void = null,
group: ?*Group = null,
entry: List.Node = .{},
runnable: Pool.Runnable = .{ .runFn = run },
fn run(runnable: *Pool.Runnable) void {
const child = @fieldParentPtr(Group.Child, "runnable", runnable);
const group = child.group orelse unreachable;
defer {
const held = group.lock.acquire();
defer held.release();
group.node_list.remove(&child.entry);
group.free_list.append(&child.entry);
group.count -= 1;
if (group.count == 0) {
if (group.waiter) |waiter| {
group.pool.schedule(&waiter.runnable, .{});
group.waiter = null;
}
}
}
child.run_callback(child);
}
};
const Waiter = struct {
frame: *@Frame(Group.wait),
runnable: Pool.Runnable = .{ .runFn = run },
fn run(runnable: *Pool.Runnable) void {
const self = @fieldParentPtr(Waiter, "runnable", runnable);
resume self.frame;
}
};
pool: *Pool,
lock: Mutex = .{},
node_list: List = .{},
free_list: List = .{},
count: usize = 0,
waiter: ?*Waiter = null,
pub fn init(pool: *Pool) Group {
return Group{ .pool = pool };
}
pub fn deinit(self: *Group) void {
self.evict();
}
pub fn wait(self: *Group) void {
suspend {
const held = self.lock.acquire();
defer held.release();
self.waiter = &Waiter{ .frame = @frame() };
}
}
pub fn add(self: *Group, child: *Group.Child) !void {
if (child.group != null) {
unreachable;
}
child.group = self;
{
const held = self.lock.acquire();
defer held.release();
self.node_list.append(&child.entry);
self.count += 1;
self.evict();
}
self.pool.schedule(&child.runnable, .{});
}
fn evict(self: *Group) void {
while (self.free_list.shift()) |entry| {
const evicted = @fieldParentPtr(Child, "entry", entry);
if (evicted.cleanup_callback) |cleanup| {
cleanup(evicted);
}
}
}
const A = struct {
child: Group.Child = .{ .run_callback = run },
fn run(child: *Group.Child) void {
const self = @fieldParentPtr(@This(), "child", child);
std.debug.print("hello from A!\n", .{});
}
};
const B = struct {
child: Group.Child = .{ .run_callback = run },
fn run(child: *Group.Child) void {
const self = @fieldParentPtr(@This(), "child", child);
std.debug.print("hello from B!\n", .{});
}
};
const C = struct {
child: Group.Child = .{ .run_callback = run },
fn run(child: *Group.Child) void {
const self = @fieldParentPtr(@This(), "child", child);
std.debug.print("hello from C!\n", .{});
}
};
fn runTest(pool: *Pool) !void {
var group = Group.init(pool);
defer group.deinit();
var a: A = .{};
var b: B = .{};
var c: C = .{};
try group.add(&a.child);
try group.add(&b.child);
try group.add(&c.child);
group.wait();
}
pub fn main() !void {
var pool = Pool.init(.{});
var frame = async runTest(&pool);
pool.deinit();
try nosuspend await frame;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment