Skip to content

Instantly share code, notes, and snippets.

@lithdew
Created January 23, 2021 23:17
Show Gist options
  • Save lithdew/156a822951a01e8f2da5847cb12f8a8a to your computer and use it in GitHub Desktop.
Save lithdew/156a822951a01e8f2da5847cb12f8a8a to your computer and use it in GitHub Desktop.
const std = @import("std");
const Pool = @import("Pool.zig");
const List = @import("DoublyLinkedList.zig");
const mem = std.mem;
const testing = std.testing;
const Group = @This();
const Child = struct {
stack: []u8,
entry: List.Node = .{},
};
const Frame = struct {
frame: anyframe,
runnable: Pool.Runnable = .{ .runFn = runFn },
fn yield(pool: *Pool) void {
var frame = Frame{ .frame = @frame() };
suspend pool.schedule(&frame.runnable, .{});
}
fn runFn(runnable: *Pool.Runnable) void {
const self = @fieldParentPtr(Frame, "runnable", runnable);
resume self.frame;
}
};
pool: *Pool,
lock: std.Thread.Mutex = .{},
free_list: List = .{},
live_count: usize = 0,
waiter: ?*Frame = null,
pub fn init(pool: *Pool) Group {
return Group{ .pool = pool };
}
pub fn deinit(self: *Group, allocator: *mem.Allocator) void {
self.wait();
self.purge(allocator);
}
pub fn wait(self: *Group) void {
const held = self.lock.acquire();
if (self.live_count == 0) {
held.release();
} else {
suspend {
self.waiter = &Frame{ .frame = @frame() };
held.release();
}
}
}
pub fn purge(self: *Group, allocator: *mem.Allocator) void {
const held = self.lock.acquire();
defer held.release();
while (self.free_list.pop()) |entry| {
const child = @fieldParentPtr(Child, "entry", entry);
allocator.free(child.stack);
}
}
pub fn run(
self: *Group,
allocator: *mem.Allocator,
comptime function: anytype,
function_args: anytype,
) mem.Allocator.Error!void {
const Args = @TypeOf(function_args);
const Closure = struct {
child: Child,
parent: *Group,
params: Args,
frame: @Frame(run),
fn run(closure: *@This()) void {
Frame.yield(closure.parent.pool);
const result = @call(.{}, function, closure.params);
const held = closure.parent.lock.acquire();
defer held.release();
closure.parent.free_list.append(&closure.child.entry);
closure.parent.live_count -= 1;
if (closure.parent.live_count == 0) {
if (closure.parent.waiter) |frame| {
closure.parent.waiter = null;
closure.parent.pool.schedule(&frame.runnable, .{});
}
}
}
};
self.purge(allocator);
const closure = try allocator.create(Closure);
errdefer allocator.destroy(closure);
closure.child = .{ .stack = mem.asBytes(closure) };
closure.parent = self;
closure.params = function_args;
{
const held = self.lock.acquire();
defer held.release();
self.live_count += 1;
}
closure.frame = async closure.run();
}
fn runA() void {
std.debug.print("hello from A!\n", .{});
}
fn runB() void {
std.debug.print("hello from B!\n", .{});
}
fn runC() void {
std.debug.print("hello from C!\n", .{});
}
pub fn main() !void {
// var gpa: std.heap.GeneralPurposeAllocator(.{}) = .{};
// defer if (gpa.deinit()) unreachable;
// const allocator = &gpa.allocator;
const allocator = std.heap.c_allocator;
var pool = Pool.init(.{});
var group = Group.init(&pool);
while (true) {
try group.run(allocator, runA, .{});
try group.run(allocator, runB, .{});
try group.run(allocator, runC, .{});
}
var frame = async group.deinit(allocator);
pool.deinit();
nosuspend await frame;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment