Last active
March 20, 2024 16:11
-
-
Save kprotty/38deb3578e0d7c40d52da4f9ad05d18d to your computer and use it in GitHub Desktop.
Base for next event loop
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
pub const Loop = struct { | |
io_poller: io.Poller, | |
pub fn init(self: *Loop) | |
}; | |
const sync = struct { | |
}; | |
const io = struct { | |
}; | |
const os = struct { | |
const Backend = if (std.builtin.os.tag == .windows) | |
WindowsBackend | |
else if (std.builtin.link_libc) | |
PosixBackend | |
else if (std.builtin.os.tag == .linux) | |
LinuxBackend | |
else | |
@compileError("Platform not supported"); | |
pub fn spinLoopHint() void { | |
switch (std.builtin.arch) { | |
.i386, .x86_64 => asm volatile("pause" ::: "memory"), | |
.aarch64 => asm volatile("yield" ::: "memory"), | |
else => {}, | |
} | |
} | |
pub const Lock = struct { | |
lock: Backend.OsLock = .{}, | |
pub fn tryAcquire(self: *Lock) ?Held { | |
if (self.lock.tryAcquire()) | |
return Held{ .lock = self }; | |
return null; | |
} | |
pub fn acquire(self: *Lock) Held { | |
self.lock.acquire(); | |
return Held{ .lock = self }; | |
} | |
pub const Held = struct { | |
lock: *Lock, | |
pub fn release(self: Held) void { | |
self.lock.release(); | |
} | |
}; | |
}; | |
fn ReturnTypeOf(comptime func: type) type { | |
return @typeInfo(@TypeOf(func)).Fn.return_type.?; | |
} | |
pub const Thread = struct { | |
pub fn Handle(comptime T: type) type { | |
return struct { | |
result: T, | |
freeFn: fn(*@This()) void, | |
thread_handle: Backend.OsThread, | |
pub fn join(self: *@This()) void { | |
self.thread_handle.join(); | |
const result = self.result; | |
(self.freeFn)(self); | |
return result; | |
} | |
}; | |
} | |
pub const SpawnConfig = struct { | |
stack_size: usize = 16 * 1024 * 1024, | |
}; | |
pub fn spawn(config: SpawnConfig, comptime func: anytype, args: anytype) !*Handle(ReturnTypeOf(func)) { | |
return Backend.OsThread.spawn(config, func, args); | |
} | |
}; | |
const WindowsBackend = struct { | |
pub const OsLock = struct { | |
srwlock: std.os.windows.SRWLOCK = std.os.windows.SRWLOCK_INIT, | |
pub fn tryAcquire(self: *OsLock) bool { | |
const status = std.os.windows.kernel32.TryAcquireSRWLockExclusive(&self.srwlock); | |
return status != std.os.windows.FALSE; | |
} | |
pub fn acquire(self: *OsLock) void { | |
std.os.windows.kernel32.AcquireSRWLockExclusive(&self.srwlock); | |
} | |
pub fn release(self: *OsLock) void { | |
std.os.windows.kernel32.ReleaseSRWLockExclusive(&self.srwlock); | |
} | |
}; | |
pub const OsThread = struct { | |
alloc_ptr: *c_void, | |
heap_handle: std.os.windows.HANDLE, | |
thread_handle: std.os.windows.HANDLE, | |
pub fn spawn(config: Thread.SpawnConfig, comptime func: anytype, args: anytype) !*Thread.Handle(ReturnTypeOf(func)) { | |
const Args = @TypeOf(args); | |
const Closure = struct { | |
func_args: Args, | |
handle: Thread.Handle(ReturnTypeOf(func)), | |
fn entry(raw_arg: std.os.windows.LPVOID) callconv(.C) std.os.windows.DWORD { | |
const self = @ptrCast(*@This(), @alignCast(@alignOf(@This()), raw_arg)); | |
self.handle.result = @call(.{}, func, self.func_args); | |
return 0; | |
} | |
fn free(handle: *Thread.Handle(ReturnTypeOf(func))) void { | |
const self = @fieldParentPtr(@This(), "handle", handle); | |
std.os.windows.kernel32.HeapFree(self.heap_handle, 0, self.alloc_ptr); | |
} | |
}; | |
const heap_handle = std.os.windows.kernel32.GetProcessHeap() orelse return error.OutOfMemory; | |
const alloc_bytes = @alignOf(Closure) + @sizeOf(Closure); | |
const alloc_ptr = std.os.windows.kernel32.HeapAlloc(heap_handle, 0, alloc_bytes) orelse return error.OutOfMemory; | |
errdefer std.os.windows.kernel32.HeapFree(heap_handle, 0, alloc_ptr); | |
const closure = @intToPtr(*Closure, std.mem.alignForward(@ptrToInt(alloc_ptr), @alignOf(Closure))); | |
closure.* = .{ | |
.func_args = args, | |
.handle = .{ | |
.result = undefined, | |
.freeFn = Closure.free, | |
.thread_handle = .{ | |
.alloc_ptr = alloc_ptr, | |
.heap_handle = heap_handle, | |
.thread_handle = undefined, | |
}, | |
}, | |
}; | |
closure.handle.thread_handle.thread_handle = std.os.windows.kernel32.CreateThread( | |
null, | |
std.math.max(64 * 1024, config.stack_size), | |
Closure.entry, | |
@ptrCast(std.os.windows.LPVOID, closure), | |
0, | |
null, | |
) orelse return error.SpawnError; | |
return &closure.handle; | |
} | |
pub fn join(self: @This()) void { | |
const handle = self.thread_handle; | |
std.os.windows.WaitForSingleObjectEx(handle, std.os.windows.INFINITE, false) catch unreachable; | |
std.os.windows.CloseHandle(handle); | |
} | |
}; | |
}; | |
const PosixBackend = struct { | |
pub const OsLock = if (std.Target.current.isDarwin()) | |
DarwinLock | |
else | |
PosixLock; | |
const DarwinLock = struct { | |
state: usize = 0, | |
pub fn acquire(self: *DarwinLock) void { | |
} | |
pub fn release(self: *DarwinLock) void { | |
} | |
}; | |
const PosixLock = struct { | |
state: State = .Unlocked, | |
const State = enum(u8) { | |
Unlocked, | |
Locked, | |
Contended, | |
}; | |
pub fn tryAcquire(self: *PosixLock) bool { | |
return @cmpxchgStrong( | |
State, | |
&self.state, | |
.Unlocked, | |
.Locked, | |
.Acquire, | |
.Monotonic, | |
) | |
} | |
pub fn acquire(self: *PosixLock) void { | |
} | |
fn acquireSlow(self: *PosixLock) void { | |
@setCold(true); | |
} | |
pub fn release(self: *PosixLock) void { | |
} | |
}; | |
pub const OsEvent = struct { | |
event_ptr: ?*PosixEvent = null, | |
pub fn wait(self: *OsEvent, timeout: ?u64) error{TimedOut}!void { | |
var posix_event: PosixEvent = undefined; | |
var has_posix_event = false; | |
defer if (has_posix_event) | |
posix_event.deinit(); | |
var event_ptr = @atomicLoad(?*PosixEvent, &self.event_ptr, .Acquire); | |
while (true) { | |
if (event_ptr != null) | |
return; | |
if (!has_posix_event) { | |
posix_event.init(); | |
has_posix_event = true; | |
} | |
event_ptr = @cmpxchgStrong( | |
?*PosixEvent, | |
&self.event_ptr, | |
event_ptr, | |
&posix_event, | |
.AcqRel, | |
.Acquire, | |
) orelse return posix_event.wait(timeout); | |
} | |
} | |
pub fn set(self: *OsEvent) void { | |
const notify_ptr = @intToPtr(*PosixEvent, @alignOf(PosixEvent)); | |
const event_ptr = @atomicRmw( | |
?*PosixEvent, | |
&self.event_ptr, | |
.Xchg, | |
notify_ptr, | |
.AcqRel, | |
); | |
const posix_event = event_ptr orelse return; | |
std.debug.assert(posix_event != notify_ptr); | |
posix_event.set(); | |
} | |
}; | |
const PosixEvent = struct { | |
is_set: bool, | |
cond: std.c.pthread_cond_t, | |
mutex: std.c.pthread_mutex_t, | |
fn init(self: *PosixEvent) void { | |
self.* = .{ | |
.is_set = false, | |
.cond = std.c.PTHREAD_COND_INITIALIZER, | |
.mutex = std.c.PTHREAD_MUTEX_INITIALIZER, | |
}; | |
} | |
fn deinit(self: *PosixEvent) void { | |
var rc = std.c.pthread_cond_destroy(&self.cond); | |
std.debug.assert(rc == 0 or rc == std.os.EINVAL); | |
rc = std.c.pthread_mutex_destroy(&self.mutex); | |
std.debug.assert(rc == 0 or rc == std.os.EINVAL); | |
} | |
fn wait(self: *PosixEvent, timeout: ?u64) error{TimedOut}!void { | |
std.debug.assert(std.c.pthread_mutex_lock(&self.mutex) == 0); | |
defer std.debug.assert(std.c.pthread_mutex_unlock(&self.mutex) == 0); | |
while (!self.is_set) { | |
const deadline_ns = deadline orelse { | |
std.debug.assert(std.c.pthread_cond_wait(&self.cond, &self.mutex) == 0); | |
continue; | |
}; | |
var now_ns = clock_gettime(std.os.CLOCK_MONOTONIC); | |
if (now_ns >= deadline_ns) { | |
return false; | |
} else { | |
now_ns = clock_gettime(std.os.CLOCK_REALTIME); | |
now_ns += deadline_ns - now_ns; | |
} | |
var ts: std.os.timespec = undefined; | |
ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(now_ns, std.time.ns_per_s)); | |
ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(now_ns, std.time.ns_per_s)); | |
const rc = std.c.pthread_cond_timedwait(&self.cond, &self.mutex, &ts); | |
std.debug.assert(rc == 0 or rc == std.os.ETIMEDOUT); | |
} | |
return true; | |
} | |
fn set(self: *PosixEvent) void { | |
} | |
}; | |
const PosixFutex = struct { | |
}; | |
pub const OsThread = struct { | |
handle: std.c.pthread_t, | |
pub fn spawn(config: Thread.SpawnConfig, comptime func: anytype, args: anytype) !*Thread.Handle(ReturnTypeOf(func)) { | |
const Args = @TypeOf(args); | |
const Closure = struct { | |
func_args: Args, | |
handle: Thread.Handle(ReturnTypeOf(func)), | |
fn entry(raw_arg: ?*c_void) callconv(.C) ?*c_void { | |
const self = @ptrCast(*@This(), @alignCast(@alignOf(@This()), raw_arg)); | |
self.handle.result = @call(.{}, func, self.func_args); | |
return null; | |
} | |
fn free(handle: *Thread.Handle(ReturnTypeOf(func))) void { | |
const self = @fieldParentPtr(@This(), "handle", handle); | |
std.heap.c_allocator.destroy(self); | |
} | |
}; | |
const closure = try std.heap.c_allocator.create(Closure); | |
errdefer std.heap.c_allocator.destroy(closure); | |
closure.* = .{ | |
.func_args = args, | |
.handle = .{ | |
.result = undefined, | |
.freeFn = Closure.free, | |
.thread_handle = .{ | |
.handle = undefined, | |
}, | |
}, | |
}; | |
var attr: std.c.pthread_attr_t = undefined; | |
if (std.c.pthread_attr_init(&attr) != 0) | |
return error.SystemResources; | |
defer std.debug.assert(std.c.pthread_attr_destroy(&attr) == 0); | |
if (std.c.pthread_attr_setstacksize(&attr, config.stack_size) != 0) | |
return error.SystemResources; | |
const rc = std.c.pthread_create( | |
&closure.handle.thread_handle.handle, | |
&attr, | |
Closure.entry, | |
@ptrCast(?*c_void, closure), | |
); | |
return switch (rc) { | |
0 => return &closure.handle, | |
std.os.EAGAIN => return error.SystemResources, | |
std.os.EPERM => unreachable, | |
std.os.EINVAL => unreachable, | |
else => |err| return std.os.unexpectedErrno(@intCast(usize, err)), | |
}; | |
} | |
pub fn join(self: @This()) void { | |
switch (std.c.pthread_join(self.handle, null);) { | |
0 => {}, | |
std.os.EINVAL => unreachable, | |
std.os.ESRCH => unreachable, | |
std.os.EDEADLK => unreachable, | |
else => unreachable, | |
} | |
} | |
}; | |
}; | |
const LinuxBackend = @compileError("TODO"); | |
}; | |
fn WaitAddress(comptime Lock: type) type { | |
return struct { | |
const Waiter = struct { | |
address: usize, | |
prev: ?*Waiter, | |
next: ?*Waiter, | |
tail: ?*Waiter, | |
wakeFn: fn(*Waiter) void, | |
}; | |
const Bucket = struct { | |
mutex: std.c.pthread_mutex_t = std.c.PTHREAD_MUTEX_INITIALIZER, | |
root: ?*Waiter = null, | |
waiting: usize = 0, | |
var buckets = [_]Bucket{.{}} ** 256; | |
fn get(address: usize) *Bucket { | |
const hash = address *% (0x9E3779B97F4A7C15 >> (64 - std.meta.bitCount(usize)); | |
const index = hash >> (std.meta.bitCount(usize) - @ctz(usize, buckets.len)); | |
return &buckets[index]; | |
} | |
const Lookup = struct { | |
} | |
}; | |
}; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const Loop = @This(); | |
counter: usize = 0, | |
stack_size: usize, | |
max_workers: usize, | |
idle_lock: Lock = .{}, | |
idle_workers: ?*Worker = null, | |
spawned_workers: ?*Worker = null, | |
run_queues: [Priority.ARRAY_SIZE]GlobalQueue = [_]GlobalQueue{.{}} ** Priority.ARRAY_SIZE, | |
pub const RunConfig = struct { | |
max_threads: ?usize = null, | |
stack_size: ?usize = null, | |
}; | |
fn ReturnTypeOf(comptime asyncFn: anytype) type { | |
return @typeInfo(@TypeOf(asyncFn)).Fn.return_type.?; | |
} | |
pub fn run(config: RunConfig, comptime asyncFn: anytype, args: anytype) !ReturnTypeOf(asyncFn) { | |
const Args = @TypeOf(args); | |
const Result = ReturnTypeOf(asyncFn); | |
const Wrapper = struct { | |
fn entry(task: *Task, result: *?Result, arguments: Args) void { | |
suspend task.* = Task.init(@frame()); | |
const res = @call(.{}, asyncFn, arguments); | |
result.* = res; // TODO: check if this hack is still needed | |
suspend Worker.getCurrent().?.loop.shutdown(); | |
} | |
}; | |
var task: Task = undefined; | |
var result: ?Result = null; | |
var frame = async Wrapper.entry(&task, &result, args); | |
runTask(config, &task); | |
return result orelse error.AsyncFnDidNotComplete; | |
} | |
pub fn runTask(config: RunConfig, task: *Task) void { | |
if (std.builtin.single_threaded) | |
@compileError("TODO: specialize for single_threaded"); | |
const stack_size = blk: { | |
const stack_size = config.stack_size orelse 16 * 1024 * 1024; | |
break :blk std.math.max(std.mem.page_size, stack_size); | |
}; | |
const max_workers = blk: { | |
const max_threads = config.max_threads orelse std.Thread.cpuCount() catch 1; | |
break :blk std.math.max(1, max_threads); | |
}; | |
var loop = Loop{ | |
.max_workers = max_workers, | |
.stack_size = stack_size, | |
}; | |
defer loop.idle_lock.deinit(); | |
loop.run_queues[0].push(Batch.from(task)); | |
loop.counter = (Counter{ | |
.state = .waking, | |
.notified = false, | |
.idle = 0, | |
.spawned = 1, | |
}).pack(); | |
Worker.run(&loop, null); | |
} | |
pub const Task = struct { | |
next: ?*Task = undefined, | |
data: usize, | |
pub fn init(frame: anyframe) Task { | |
return .{ .data = @ptrToInt(frame) | 0 }; | |
} | |
pub const Callback = fn(*Task) callconv(.C) void; | |
pub fn initCallback(callback: Callback) void { | |
return .{ .data = @ptrToInt(callback) | 1 }; | |
} | |
fn run(self: *Task) void { | |
switch (self.data & 1) { | |
0 => resume @intToPtr(anyframe, self.data), | |
1 => @intToPtr(Callback, self.data & ~@as(usize, 1))(self), | |
else => unreachable, | |
} | |
} | |
}; | |
pub const Priority = enum(u2) { | |
Low = 0, | |
Normal = 1, | |
High = 2, | |
Handoff = 3, | |
const ARRAY_SIZE = 3; | |
fn toArrayIndex(self: Priority) usize { | |
return switch (self) { | |
.Handoff, .High => 2, | |
.Normal => 1, | |
.Low => 0, | |
}; | |
} | |
}; | |
pub fn scheduleRemote(self: *Loop, task: *Task, priority: Priority) void { | |
self.run_queues[priority.toArrayIndex()].push(Batch.from(task)); | |
self.notifyWorkersWith(null); | |
} | |
pub fn schedule(task: *Task, priority: Priority) void { | |
const worker = Worker.getCurrent() orelse @panic("Loop.schedule called outside of worker thread pool"); | |
worker.schedule(Batch.from(task), priority); | |
} | |
const Counter = struct { | |
state: State = .pending, | |
notified: bool = false, | |
idle: usize = 0, | |
spawned: usize = 0, | |
const count_bits = @divFloor(std.meta.bitCount(usize) - 3, 2); | |
const Count = std.meta.Int(.unsigned, count_bits); | |
const State = enum(u2) { | |
pending = 0, | |
waking, | |
signaled, | |
shutdown, | |
}; | |
fn pack(self: Counter) usize { | |
return ( | |
(@as(usize, @enumToInt(self.state)) << 0) | | |
(@as(usize, @boolToInt(self.notified)) << 2) | | |
(@as(usize, @intCast(Count, self.idle)) << 3) | | |
(@as(usize, @intCast(Count, self.spawned)) << (3 + count_bits)) | |
); | |
} | |
fn unpack(value: usize) Counter { | |
return .{ | |
.state = @intToEnum(State, @truncate(u2, value)), | |
.notified = value & (1 << 2) != 0, | |
.idle = @truncate(Count, value >> 3), | |
.spawned = @truncate(Count, value >> (3 + count_bits)), | |
}; | |
} | |
}; | |
fn getWorkerCount(self: *const Loop) usize { | |
const counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic)); | |
return counter.spawned; | |
} | |
fn getWorkerIter(self: *const Loop) WorkerIter { | |
const spawned_workers = @atomicLoad(?*Worker, &self.spawned_workers, .Acquire); | |
return .{ .spawned = spawned_workers }; | |
} | |
const WorkerIter = struct { | |
spawned: ?*Worker = null, | |
fn next(self: *WorkerIter) ?*Worker { | |
const worker = self.spawned orelse return null; | |
self.spawned = worker.spawned_next; | |
return worker; | |
} | |
}; | |
fn beginWorkerWith(self: *Loop, worker: *Worker) void { | |
var spawned_workers = @atomicLoad(?*Worker, &self.spawned_workers, .Monotonic); | |
while (true) { | |
worker.spawned_next = spawned_workers; | |
spawned_workers = @cmpxchgWeak( | |
?*Worker, | |
&self.spawned_workers, | |
spawned_workers, | |
worker, | |
.Release, | |
.Monotonic, | |
) orelse break; | |
} | |
} | |
fn notifyWorkersWith(self: *Loop, worker: ?*Worker) void { | |
var did_spawn = false; | |
var spawn_attempts_remaining: u8 = 5; | |
var is_waking = if (worker) |w| w.state == .waking else false; | |
var counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic)); | |
while (true) { | |
if (counter.state == .shutdown) { | |
if (did_spawn) | |
self.markShutdown(); | |
return; | |
} | |
var is_spawning = false; | |
var is_signaling = false; | |
var new_counter = counter; | |
if (is_waking) { | |
std.debug.assert(counter.state == .waking); | |
if (counter.idle > 0) { | |
is_signaling = true; | |
new_counter.state = .signaled; | |
if (did_spawn) | |
new_counter.spawned -= 1; | |
} else if (spawn_attempts_remaining > 0 and (did_spawn or counter.spawned < self.max_workers)) { | |
is_spawning = true; | |
if (!did_spawn) | |
new_counter.spawned += 1; | |
} else { | |
new_counter.notified = true; | |
new_counter.state = .pending; | |
if (did_spawn) | |
new_counter.spawned -= 1; | |
} | |
} else { | |
if (counter.state == .pending and counter.idle > 0) { | |
is_signaling = true; | |
new_counter.state = .signaled; | |
} else if (counter.state == .pending and counter.spawned < self.max_workers) { | |
is_spawning = true; | |
new_counter.state = .waking; | |
new_counter.spawned += 1; | |
} else if (!counter.notified) { | |
new_counter.notified = true; | |
} else { | |
return; | |
} | |
} | |
if (@cmpxchgWeak( | |
usize, | |
&self.counter, | |
counter.pack(), | |
new_counter.pack(), | |
.Release, | |
.Monotonic, | |
)) |updated| { | |
std.Thread.spinLoopHint(); | |
counter = Counter.unpack(updated); | |
continue; | |
} | |
is_waking = true; | |
if (is_signaling) { | |
self.notifyOne(); | |
return; | |
} | |
if (is_spawning) { | |
did_spawn = true; | |
if (Worker.spawn(self)) | |
return; | |
} else { | |
return; | |
} | |
std.Thread.spinLoopHint(); | |
spawn_attempts_remaining -= 1; | |
counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic)); | |
} | |
} | |
fn suspendWorkerWith(self: *Loop, worker: *Worker) void { | |
var counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic)); | |
while (true) { | |
if (counter.state == .shutdown) { | |
worker.state = .shutdown; | |
self.markShutdown(); | |
return; | |
} | |
if (counter.notified or counter.state == .signaled or worker.state != .waiting) { | |
var new_counter = counter; | |
new_counter.notified = false; | |
if (counter.state == .signaled) { | |
new_counter.state = .waking; | |
if (worker.state == .waiting) | |
new_counter.idle -= 1; | |
} else if (counter.notified) { | |
if (worker.state == .waking) | |
new_counter.state = .waking; | |
if (worker.state == .waiting) | |
new_counter.idle -= 1; | |
} else { | |
if (worker.state == .waking) | |
new_counter.state = .pending; | |
if (worker.state != .waiting) | |
new_counter.idle += 1; | |
} | |
if (@cmpxchgWeak( | |
usize, | |
&self.counter, | |
counter.pack(), | |
new_counter.pack(), | |
.Acquire, | |
.Monotonic, | |
)) |updated| { | |
std.Thread.spinLoopHint(); | |
counter = Counter.unpack(updated); | |
continue; | |
} | |
if (counter.notified or counter.state == .signaled) { | |
if (counter.state == .signaled) { | |
worker.state = .waking; | |
} else if (worker.state == .waiting) { | |
worker.state = .running; | |
} | |
return; | |
} | |
} | |
worker.state = .waiting; | |
self.wait(worker); | |
counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic)); | |
} | |
} | |
fn shutdown(self: *Loop) void { | |
var counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic)); | |
while (counter.state != .shutdown) { | |
var new_counter = counter; | |
new_counter.state = .shutdown; | |
new_counter.idle = 0; | |
if (@cmpxchgWeak( | |
usize, | |
&self.counter, | |
counter.pack(), | |
new_counter.pack(), | |
.AcqRel, | |
.Monotonic, | |
)) |updated| { | |
std.Thread.spinLoopHint(); | |
counter = Counter.unpack(updated); | |
continue; | |
} | |
if (counter.idle > 0) | |
self.notifyAll(); | |
return; | |
} | |
} | |
fn markShutdown(self: *Loop) void { | |
var counter = Counter{ .spawned = 1 }; | |
counter = Counter.unpack(@atomicRmw(usize, &self.counter, .Sub, counter.pack(), .AcqRel)); | |
std.debug.assert(counter.state == .shutdown); | |
if (counter.spawned == 1 and counter.idle != 0) { | |
self.notifyOne(); | |
} | |
} | |
fn joinWith(self: *Loop, worker: *Worker) void { | |
var counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Acquire)); | |
while (true) { | |
if (counter.spawned == 0) | |
break; | |
if (counter.idle == 0) { | |
var new_counter = counter; | |
new_counter.idle = 1; | |
if (@cmpxchgWeak( | |
usize, | |
&self.counter, | |
counter.pack(), | |
new_counter.pack(), | |
.Acquire, | |
.Acquire, | |
)) |updated| { | |
std.Thread.spinLoopHint(); | |
counter = Counter.unpack(updated); | |
continue; | |
} | |
} | |
self.wait(worker); | |
counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Acquire)); | |
} | |
var pending_workers = self.getWorkerIter(); | |
while (pending_workers.next()) |pending_worker| { | |
const thread = pending_worker.thread orelse continue; | |
pending_worker.notify(); | |
thread.join(); | |
} | |
} | |
fn wait(self: *Loop, worker: *Worker) void { | |
const is_waiting = blk: { | |
const held = self.idle_lock.acquire(); | |
defer held.release(); | |
const counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic)); | |
const should_wake = switch (worker.state) { | |
.shutdown => counter.spawned == 0, | |
.waiting => counter.notified or counter.state == .signaled or counter.state == .shutdown, | |
else => unreachable, | |
}; | |
if (should_wake) { | |
break :blk false; | |
} | |
worker.idle_next = self.idle_workers; | |
self.idle_workers = worker; | |
break :blk true; | |
}; | |
if (is_waiting) { | |
worker.wait(); | |
} | |
} | |
fn notifyOne(self: *Loop) void { | |
self.wake(false); | |
} | |
fn notifyAll(self: *Loop) void { | |
self.wake(true); | |
} | |
fn wake(self: *Loop, wake_all: bool) void { | |
const worker = blk: { | |
const held = self.idle_lock.acquire(); | |
defer held.release(); | |
const worker = self.idle_workers orelse { | |
return; | |
}; | |
if (wake_all) { | |
self.idle_workers = null; | |
} else { | |
self.idle_workers = worker.idle_next; | |
worker.idle_next = null; | |
} | |
break :blk worker; | |
}; | |
var idle_workers: ?*Worker = worker; | |
while (idle_workers) |idle_worker| { | |
idle_workers = idle_worker.idle_next; | |
idle_worker.notify(); | |
} | |
} | |
const Worker = struct { | |
loop: *Loop, | |
thread: ?Thread, | |
state: State = .waking, | |
wait_state: usize = 0, | |
idle_next: ?*Worker = null, | |
spawned_next: ?*Worker = null, | |
steal_targets: WorkerIter = .{}, | |
run_next: Batch = .{}, | |
run_queues: [Priority.ARRAY_SIZE]LocalQueue = [_]LocalQueue{.{}} ** Priority.ARRAY_SIZE, | |
const ThreadLocalWorkerPtr = ThreadLocalUsize(struct{}); | |
const State = enum { | |
running, | |
waking, | |
waiting, | |
shutdown, | |
}; | |
fn spawn(loop: *Loop) bool { | |
const Spawner = struct { | |
_loop: *Loop = undefined, | |
_thread: Thread = undefined, | |
put_event: Event = .{}, | |
got_event: Event = .{}, | |
fn entry(self: *@This()) void { | |
std.debug.assert(self.put_event.wait(null)); | |
const _loop = self._loop; | |
const _thread = self._thread; | |
self.got_event.set(); | |
Worker.run(_loop, _thread); | |
} | |
}; | |
var spawner = Spawner{}; | |
defer { | |
spawner.put_event.deinit(); | |
spawner.got_event.deinit(); | |
} | |
spawner._loop = loop; | |
spawner._thread = Thread.spawn(loop.stack_size, &spawner, Spawner.entry) catch return false; | |
spawner.put_event.set(); | |
std.debug.assert(spawner.got_event.wait(null)); | |
return true; | |
} | |
fn run(loop: *Loop, thread: ?Thread) void { | |
var self = Worker{ | |
.loop = loop, | |
.thread = thread, | |
}; | |
loop.beginWorkerWith(&self); | |
ThreadLocalWorkerPtr.set(@ptrToInt(&self)); | |
var tick = @truncate(u8, @ptrToInt(&self) >> @sizeOf(*Worker)); | |
while (self.state != .shutdown) { | |
tick +%= 1; | |
if (self.poll(tick)) |task| { | |
if (self.state == .waking) | |
loop.notifyWorkersWith(&self); | |
self.state = .running; | |
task.run(); | |
continue; | |
} | |
loop.suspendWorkerWith(&self); | |
} | |
if (thread == null) { | |
loop.joinWith(&self); | |
} else { | |
self.wait(); | |
} | |
} | |
fn getCurrent() ?*Worker { | |
return @intToPtr(?*Worker, ThreadLocalWorkerPtr.get()); | |
} | |
const WaitState = enum(u2) { | |
Empty = 0, | |
Waiting = 1, | |
Notified = 3, | |
}; | |
fn wait(self: *Worker) void { | |
var event align(std.math.max(@alignOf(Event), 4)) = Event{}; | |
defer event.deinit(); | |
if (@cmpxchgStrong( | |
usize, | |
&self.wait_state, | |
@enumToInt(WaitState.Empty), | |
@enumToInt(WaitState.Waiting) | @ptrToInt(&event), | |
.AcqRel, | |
.Acquire, | |
)) |updated| { | |
std.debug.assert(@intToEnum(WaitState, @truncate(u2, updated)) == .Notified); | |
@atomicStore(usize, &self.wait_state, @enumToInt(WaitState.Empty), .Monotonic); | |
return; | |
} | |
const timeout: ?u64 = null; | |
const timed_out = !event.wait(timeout); | |
std.debug.assert(!timed_out); | |
} | |
fn notify(self: *Worker) void { | |
var wait_state = @atomicLoad(usize, &self.wait_state, .Monotonic); | |
while (true) { | |
var new_state: WaitState = undefined; | |
switch (@intToEnum(WaitState, @truncate(u2, wait_state))) { | |
.Empty => new_state = .Notified, | |
.Waiting => new_state = .Empty, | |
.Notified => return, | |
} | |
if (@cmpxchgWeak( | |
usize, | |
&self.wait_state, | |
wait_state, | |
@enumToInt(new_state), | |
.AcqRel, | |
.Monotonic, | |
)) |updated| { | |
std.Thread.spinLoopHint(); | |
wait_state = updated; | |
continue; | |
} | |
if (new_state == .Empty) { | |
const EventPtr = *align(std.math.max(@alignOf(Event), 4)) Event; | |
const event = @intToPtr(EventPtr, wait_state & ~@as(usize, 0b11)); | |
event.set(); | |
} | |
return; | |
} | |
} | |
fn schedule(self: *Worker, batch: Batch, priority: Priority) void { | |
if (batch.isEmpty()) | |
return; | |
if (priority == .Handoff) { | |
self.run_next.push(batch); | |
} else { | |
self.run_queues[priority.toArrayIndex()].push(batch); | |
} | |
self.loop.notifyWorkersWith(null); | |
} | |
fn poll(self: *Worker, tick: u8) ?*Task { | |
if (tick % 127 == 0) { | |
if (self.pollSteal(null)) |task| | |
return task; | |
} | |
if (tick % 61 == 0) { | |
if (self.pollGlobal(null)) |task| | |
return task; | |
} | |
if (self.pollLocal(tick)) |task| | |
return task; | |
var steal_attempt: u8 = 0; | |
while (steal_attempt < 4) : (steal_attempt += 1) { | |
if (self.pollGlobal(steal_attempt)) |task| | |
return task; | |
if (self.pollSteal(steal_attempt)) |task| | |
return task; | |
} | |
return null; | |
} | |
fn pollLocal(self: *Worker, tick: u8) ?*Task { | |
var priority_order = [_]Priority{ .Handoff, .High, .Normal, .Low }; | |
if (tick % 61 == 0) { | |
priority_order = [_]Priority{ .Low, .Normal, .High, .Handoff }; | |
} | |
for (priority_order) |priority| { | |
if (priority == .Handoff) { | |
if (self.run_next.pop()) |task| | |
return task; | |
} | |
const be_fair = tick % 31 == 0; | |
if (self.run_queues[priority.toArrayIndex()].pop(be_fair)) |task| | |
return task; | |
} | |
return null; | |
} | |
fn pollGlobal(self: *Worker, steal_attempt: ?u8) ?*Task { | |
return self.pollQueues(&self.loop.run_queues, "popAndStealGlobal", steal_attempt); | |
} | |
fn pollSteal(self: *Worker, steal_attempt: ?u8) ?*Task { | |
var iter = self.loop.getWorkerCount(); | |
while (iter > 0) : (iter -= 1) { | |
const target = self.steal_targets.next() orelse blk: { | |
self.steal_targets = self.loop.getWorkerIter(); | |
break :blk self.steal_targets.next() orelse unreachable; | |
}; | |
if (target == self) | |
continue; | |
if (self.pollQueues(&target.run_queues, "popAndStealLocal", steal_attempt)) |task| | |
return task; | |
} | |
return null; | |
} | |
fn pollQueues( | |
self: *Worker, | |
target_queues: anytype, | |
comptime popAndStealFn: []const u8, | |
steal_attempt: ?u8, | |
) ?*Task { | |
const priority_order: []const Priority = blk: { | |
const attempt = steal_attempt orelse { | |
break :blk &[_]Priority{ .Low, .Normal, .High }; | |
}; | |
break :blk switch (attempt) { | |
0 => &[_]Priority{ .High }, | |
1 => &[_]Priority{ .High, .Normal }, | |
else => &[_]Priority{ .High, .Normal, .Low }, | |
}; | |
}; | |
for (priority_order) |priority| { | |
const be_fair = steal_attempt == null; | |
const local_queue = &self.run_queues[priority.toArrayIndex()]; | |
const target_queue = &target_queues[priority.toArrayIndex()]; | |
if (@field(local_queue, popAndStealFn)(be_fair, target_queue)) |task| | |
return task; | |
} | |
return null; | |
} | |
}; | |
fn ThreadLocalUsize(comptime UniqueKey: type) type { | |
const is_apple_silicon = std.Target.current.isDarwin() and std.builtin.arch == .aarch64; | |
// For normal platforms, we use the compilers built in "threadlocal" keyword. | |
if (!is_apple_silicon) { | |
return struct { | |
threadlocal var tls_value: usize = 0; | |
pub fn get() usize { | |
return tls_value; | |
} | |
pub fn set(value: usize) void { | |
tls_value = value; | |
} | |
}; | |
} | |
// For Apple Silicon, LLD currently has some issues with it which prevents the threadlocal keyword from work correctly. | |
// So for now we fallback to the OS provided thread local mechanics. | |
return struct { | |
const pthread_key_t = c_ulong; | |
const pthread_once_t = extern struct { | |
__sig: c_long = 0x30B1BCBA, | |
__opaque: [4]u8 = [_]u8{ 0, 0, 0, 0 }, | |
}; | |
extern "c" fn pthread_once(o: *pthread_once_t, f: ?fn() callconv(.C) void) callconv(.C) c_int; | |
extern "c" fn pthread_key_create(k: *pthread_key_t, d: ?fn(?*c_void) callconv(.C) void) callconv(.C) c_int; | |
extern "c" fn pthread_setspecific(k: pthread_key_t, p: ?*c_void) callconv(.C) c_int; | |
extern "c" fn pthread_getspecific(k: pthread_key_t) callconv(.C) ?*c_void; | |
var tls_key: pthread_key_t = undefined; | |
var tls_key_once: pthread_once_t = .{}; | |
fn tls_init() callconv(.C) void { | |
std.debug.assert(pthread_key_create(&tls_key, null) == 0); | |
} | |
pub fn get() usize { | |
std.debug.assert(pthread_once(&tls_key_once, tls_init) == 0); | |
return @ptrToInt(pthread_getspecific(tls_key)); | |
} | |
pub fn set(value: usize) void { | |
std.debug.assert(pthread_once(&tls_key_once, tls_init) == 0); | |
std.debug.assert(pthread_setspecific(tls_key, @intToPtr(?*c_void, value)) == 0); | |
} | |
}; | |
} | |
const Batch = struct { | |
head: ?*Task = null, | |
tail: *Task = undefined, | |
fn from(task: *Task) Batch { | |
task.next = null; | |
return Batch{ | |
.head = task, | |
.tail = task, | |
}; | |
} | |
fn isEmpty(self: Batch) bool { | |
return self.head == null; | |
} | |
fn push(self: *Batch, batch: Batch) void { | |
if (self.isEmpty()) { | |
self.* = batch; | |
} else if (!batch.isEmpty()) { | |
self.tail.next = batch.head; | |
self.tail = batch.tail; | |
} | |
} | |
fn pop(self: *Batch) ?*Task { | |
const task = self.head orelse return null; | |
self.head = task.next; | |
return task; | |
} | |
}; | |
const GlobalQueue = UnboundedQueue; | |
const LocalQueue = struct { | |
buffer: BoundedQueue = .{}, | |
overflow: UnboundedQueue = .{}, | |
fn push(self: *LocalQueue, batch: Batch) void { | |
if (self.buffer.push(batch)) |overflowed| | |
self.overflow.push(overflowed); | |
} | |
fn pop(self: *LocalQueue, be_fair: bool) ?*Task { | |
if (be_fair) { | |
if (self.buffer.pop()) |task| | |
return task; | |
} | |
if (self.buffer.popAndStealUnbounded(&self.overflow)) |task| | |
return task; | |
if (self.buffer.pop()) |task| | |
return task; | |
return null; | |
} | |
fn popAndStealGlobal(self: *LocalQueue, be_fair: bool, target: *GlobalQueue) ?*Task { | |
return self.buffer.popAndStealUnbounded(target); | |
} | |
fn popAndStealLocal(self: *LocalQueue, be_fair: bool, target: *LocalQueue) ?*Task { | |
if (self == target) | |
return self.pop(be_fair); | |
if (be_fair) { | |
if (self.buffer.popAndStealBounded(&target.buffer)) |task| | |
return task; | |
} | |
if (self.buffer.popAndStealUnbounded(&target.overflow)) |task| | |
return task; | |
if (self.buffer.popAndStealBounded(&target.buffer)) |task| | |
return task; | |
return null; | |
} | |
}; | |
const UnboundedQueue = struct { | |
head: ?*Task = null, | |
tail: usize = 0, | |
stub: Task = .{ | |
.next = null, | |
.data = undefined, | |
}, | |
fn push(self: *UnboundedQueue, batch: Batch) void { | |
if (batch.isEmpty()) | |
return; | |
const head = @atomicRmw(?*Task, &self.head, .Xchg, batch.tail, .AcqRel); | |
const prev = head orelse &self.stub; | |
@atomicStore(?*Task, &prev.next, batch.head, .Release); | |
} | |
fn tryAcquireConsumer(self: *UnboundedQueue) ?Consumer { | |
var tail = @atomicLoad(usize, &self.tail, .Monotonic); | |
while (true) : (std.Thread.spinLoopHint()) { | |
const head = @atomicLoad(?*Task, &self.head, .Monotonic); | |
if (head == null or head == &self.stub) | |
return null; | |
if (tail & 1 != 0) | |
return null; | |
tail = @cmpxchgWeak( | |
usize, | |
&self.tail, | |
tail, | |
tail | 1, | |
.Acquire, | |
.Monotonic, | |
) orelse return Consumer{ | |
.queue = self, | |
.tail = @intToPtr(?*Task, tail) orelse &self.stub, | |
}; | |
} | |
} | |
const Consumer = struct { | |
queue: *UnboundedQueue, | |
tail: *Task, | |
fn release(self: Consumer) void { | |
@atomicStore(usize, &self.queue.tail, @ptrToInt(self.tail), .Release); | |
} | |
fn pop(self: *Consumer) ?*Task { | |
var tail = self.tail; | |
var next = @atomicLoad(?*Task, &tail.next, .Acquire); | |
if (tail == &self.queue.stub) { | |
tail = next orelse return null; | |
self.tail = tail; | |
next = @atomicLoad(?*Task, &tail.next, .Acquire); | |
} | |
if (next) |task| { | |
self.tail = task; | |
return tail; | |
} | |
const head = @atomicLoad(?*Task, &self.queue.head, .Monotonic); | |
if (tail != head) { | |
return null; | |
} | |
self.queue.push(Batch.from(&self.queue.stub)); | |
if (@atomicLoad(?*Task, &tail.next, .Acquire)) |task| { | |
self.tail = task; | |
return tail; | |
} | |
return null; | |
} | |
}; | |
}; | |
const BoundedQueue = struct { | |
head: Pos = 0, | |
tail: Pos = 0, | |
buffer: [capacity]*Task = undefined, | |
const Pos = std.meta.Int(.unsigned, std.meta.bitCount(usize) / 2); | |
const capacity = 64; | |
comptime { | |
std.debug.assert(capacity <= std.math.maxInt(Pos)); | |
} | |
fn push(self: *BoundedQueue, _batch: Batch) ?Batch { | |
var batch = _batch; | |
if (batch.isEmpty()) { | |
return null; | |
} | |
var tail = self.tail; | |
var head = @atomicLoad(Pos, &self.head, .Monotonic); | |
while (true) { | |
if (batch.isEmpty()) | |
return null; | |
if (tail -% head < self.buffer.len) { | |
while (tail -% head < self.buffer.len) { | |
const task = batch.pop() orelse break; | |
@atomicStore(*Task, &self.buffer[tail % self.buffer.len], task, .Unordered); | |
tail +%= 1; | |
} | |
@atomicStore(Pos, &self.tail, tail, .Release); | |
std.Thread.spinLoopHint(); | |
head = @atomicLoad(Pos, &self.head, .Monotonic); | |
continue; | |
} | |
const new_head = head +% @intCast(Pos, self.buffer.len / 2); | |
if (@cmpxchgWeak( | |
Pos, | |
&self.head, | |
head, | |
new_head, | |
.Acquire, | |
.Monotonic, | |
)) |updated| { | |
std.Thread.spinLoopHint(); | |
head = updated; | |
continue; | |
} | |
var overflowed = Batch{}; | |
while (head != new_head) : (head +%= 1) { | |
const task = self.buffer[head % self.buffer.len]; | |
overflowed.push(Batch.from(task)); | |
} | |
overflowed.push(batch); | |
return overflowed; | |
} | |
} | |
fn pop(self: *BoundedQueue) ?*Task { | |
var tail = self.tail; | |
var head = @atomicLoad(Pos, &self.head, .Monotonic); | |
while (tail != head) { | |
head = @cmpxchgWeak( | |
Pos, | |
&self.head, | |
head, | |
head +% 1, | |
.Acquire, | |
.Monotonic, | |
) orelse return self.buffer[head % self.buffer.len]; | |
std.Thread.spinLoopHint(); | |
} | |
return null; | |
} | |
fn popAndStealUnbounded(self: *BoundedQueue, target: *UnboundedQueue) ?*Task { | |
var consumer = target.tryAcquireConsumer() orelse return null; | |
defer consumer.release(); | |
const tail = self.tail; | |
const head = @atomicLoad(Pos, &self.head, .Monotonic); | |
var new_tail = tail; | |
var first_task: ?*Task = null; | |
while (first_task == null or (new_tail -% head < self.buffer.len)) { | |
const task = consumer.pop() orelse break; | |
if (first_task == null) { | |
first_task = task; | |
} else { | |
@atomicStore(*Task, &self.buffer[new_tail % self.buffer.len], task, .Unordered); | |
new_tail +%= 1; | |
} | |
} | |
if (new_tail != tail) | |
@atomicStore(Pos, &self.tail, new_tail, .Release); | |
return first_task; | |
} | |
fn popAndStealBounded(self: *BoundedQueue, target: *BoundedQueue) ?*Task { | |
if (self == target) | |
return self.pop(); | |
const head = @atomicLoad(Pos, &self.head, .Monotonic); | |
const tail = self.tail; | |
if (tail != head) | |
return self.pop(); | |
var target_head = @atomicLoad(Pos, &target.head, .Monotonic); | |
while (true) { | |
const target_tail = @atomicLoad(Pos, &target.tail, .Acquire); | |
const target_size = target_tail -% target_head; | |
if (target_size == 0) | |
return null; | |
var steal = target_size - (target_size / 2); | |
if (steal > target.buffer.len / 2) { | |
std.Thread.spinLoopHint(); | |
target_head = @atomicLoad(Pos, &target.head, .Monotonic); | |
continue; | |
} | |
const first_task = @atomicLoad(*Task, &target.buffer[target_head % target.buffer.len], .Unordered); | |
var new_target_head = target_head +% 1; | |
var new_tail = tail; | |
steal -= 1; | |
while (steal > 0) : (steal -= 1) { | |
const task = @atomicLoad(*Task, &target.buffer[new_target_head % target.buffer.len], .Unordered); | |
new_target_head +%= 1; | |
@atomicStore(*Task, &self.buffer[new_tail % self.buffer.len], task, .Unordered); | |
new_tail +%= 1; | |
} | |
if (@cmpxchgWeak( | |
Pos, | |
&target.head, | |
target_head, | |
new_target_head, | |
.AcqRel, | |
.Monotonic, | |
)) |updated| { | |
std.Thread.spinLoopHint(); | |
target_head = updated; | |
continue; | |
} | |
if (new_tail != tail) | |
@atomicStore(Pos, &self.tail, new_tail, .Release); | |
return first_task; | |
} | |
} | |
}; | |
const Lock = if (std.builtin.os.tag == .windows) | |
WindowsLock | |
else if (std.Target.current.isDarwin()) | |
DarwinLock | |
else if (std.builtin.link_libc) | |
PosixLock | |
else if (std.builtin.os.tag == .linux) | |
LinuxLock | |
else | |
@compileError("Unimplemented Lock primitive for platform"); | |
const WindowsLock = struct { | |
srwlock: std.os.windows.SRWLOCK = std.os.windows.SRWLOCK_INIT, | |
const Self = @This(); | |
pub fn deinit(self: *Self) void { | |
self.* = undefined; | |
} | |
pub fn acquire(self: *Self) Held { | |
std.os.windows.kernel32.AcquireSRWLockExclusive(&self.srwlock); | |
return Held{ .lock = self }; | |
} | |
pub const Held = struct { | |
lock: *Self, | |
pub fn release(self: Held) void { | |
std.os.windows.kernel32.ReleaseSRWLockExclusive(&self.lock.srwlock); | |
} | |
}; | |
}; | |
const DarwinLock = extern struct { | |
os_unfair_lock: u32 = 0, | |
const Self = @This(); | |
extern fn os_unfair_lock_lock(lock: *Self) void; | |
extern fn os_unfair_lock_unlock(lock: *Self) void; | |
pub fn deinit(self: *Self) void { | |
self.* = undefined; | |
} | |
pub fn acquire(self: *Self) Held { | |
os_unfair_lock_lock(self); | |
return Held{ .lock = self }; | |
} | |
pub const Held = struct { | |
lock: *Self, | |
pub fn release(self: Held) void { | |
os_unfair_lock_unlock(self.lock); | |
} | |
}; | |
}; | |
const PosixLock = struct { | |
mutex: std.c.pthread_mutex_t = std.c.PTHREAD_MUTEX_INITIALIZER, | |
const Self = @This(); | |
pub fn deinit(self: *Self) void { | |
const rc = std.c.pthread_mutex_destroy(&self.mutex); | |
std.debug.assert(rc == 0 or rc == std.os.EINVAL); | |
self.* = undefined; | |
} | |
pub fn acquire(self: *Self) Held { | |
std.debug.assert(std.c.pthread_mutex_lock(&self.mutex) == 0); | |
return Held{ .lock = self }; | |
} | |
pub const Held = struct { | |
lock: *Self, | |
pub fn release(self: Held) void { | |
std.debug.assert(std.c.pthread_mutex_unlock(&self.lock.mutex) == 0); | |
} | |
}; | |
}; | |
const LinuxLock = struct { | |
state: State = .unlocked, | |
const Self = @This(); | |
const State = enum(i32) { | |
unlocked = 0, | |
locked, | |
contended, | |
}; | |
pub fn deinit(self: *Self) void { | |
self.* = undefined; | |
} | |
pub fn acquire(self: *Self) Held { | |
if (@cmpxchgWeak( | |
State, | |
&self.state, | |
.unlocked, | |
.locked, | |
.Acquire, | |
.Monotonic, | |
)) |_| { | |
self.acquireSlow(); | |
} | |
return Held{ .lock = self }; | |
} | |
fn acquireSlow(self: *Self) void { | |
@setCold(true); | |
var spin: usize = 0; | |
var lock_state = State.locked; | |
var state = @atomicLoad(State, &self.state, .Monotonic); | |
while (true) { | |
if (state == .unlocked) { | |
state = @cmpxchgWeak( | |
State, | |
&self.state, | |
state, | |
lock_state, | |
.Acquire, | |
.Monotonic, | |
) orelse return; | |
std.Thread.spinLoopHint(); | |
continue; | |
} | |
if (state == .locked and spin < 100) { | |
spin += 1; | |
std.Thread.spinLoopHint(); | |
state = @atomicLoad(State, &self.state, .Monotonic); | |
} | |
if (state != .contended) { | |
if (@cmpxchgWeak( | |
State, | |
&self.state, | |
state, | |
.contended, | |
.Monotonic, | |
.Monotonic, | |
)) |updated| { | |
std.Thread.spinLoopHint(); | |
state = updated; | |
continue; | |
} | |
} | |
switch (std.os.linux.getErrno(std.os.linux.futex_wait( | |
@ptrCast(*const i32, &self.state), | |
std.os.linux.FUTEX_PRIVATE_FLAG | std.os.linux.FUTEX_WAIT, | |
@enumToInt(State.contended), | |
null, | |
))) { | |
0 => {}, | |
std.os.EINTR => {}, | |
std.os.EAGAIN => {}, | |
std.os.ETIMEDOUT => unreachable, | |
else => unreachable, | |
} | |
spin = 0; | |
lock_state = .contended; | |
state = @atomicLoad(State, &self.state, .Monotonic); | |
} | |
} | |
pub const Held = struct { | |
lock: *Self, | |
pub fn release(self: Held) void { | |
switch (@atomicRmw(State, &self.lock.state, .Xchg, .unlocked, .Release)) { | |
.unlocked => unreachable, | |
.locked => {}, | |
.contended => self.releaseSlow(), | |
} | |
} | |
fn releaseSlow(self: Held) void { | |
@setCold(true); | |
switch (std.os.linux.getErrno(std.os.linux.futex_wake( | |
@ptrCast(*const i32, &self.lock.state), | |
std.os.linux.FUTEX_PRIVATE_FLAG | std.os.linux.FUTEX_WAKE, | |
1, | |
))) { | |
0 => {}, | |
std.os.EINVAL => {}, | |
std.os.EACCES => {}, | |
std.os.EFAULT => {}, | |
else => unreachable, | |
} | |
} | |
}; | |
}; | |
const Event = if (std.builtin.os.tag == .windows) | |
WindowsEvent | |
else if (std.builtin.link_libc) | |
PosixEvent | |
else if (std.builtin.os.tag == .linux) | |
LinuxEvent | |
else | |
@compileError("Unimplemented Event primitive for platform"); | |
const WindowsEvent = struct { | |
is_set: bool = false, | |
lock: std.os.windows.SRWLOCK = std.os.windows.SRWLOCK_INIT, | |
cond: std.os.windows.CONDITION_VARIABLE = std.os.windows.CONDITION_VARIABLE_INIT, | |
const Self = @This(); | |
pub fn deinit(self: *Self) void { | |
self.* = undefined; | |
} | |
pub fn set(self: *Self) void { | |
std.os.windows.kernel32.AcquireSRWLockExclusive(&self.lock); | |
defer std.os.windows.kernel32.ReleaseSRWLockExclusive(&self.lock); | |
self.is_set = true; | |
std.os.windows.kernel32.WakeConditionVariable(&self.cond); | |
} | |
threadlocal var tls_frequency: u64 = 0; | |
pub fn wait(self: *Self, timeout: ?u64) bool { | |
var counter: u64 = undefined; | |
var frequency: u64 = undefined; | |
if (timeout) |timeout_ns| { | |
counter = std.os.windows.QueryPerformanceCounter(); | |
frequency = tls_frequency; | |
if (frequency == 0) { | |
frequency = std.os.windows.QueryPerformanceFrequency(); | |
tls_frequency = frequency; | |
} | |
} | |
std.os.windows.kernel32.AcquireSRWLockExclusive(&self.lock); | |
defer std.os.windows.kernel32.ReleaseSRWLockExclusive(&self.lock); | |
while (!self.is_set) { | |
var timeout_ms: std.os.windows.DWORD = std.os.windows.INFINITE; | |
if (timeout) |timeout_ns| { | |
const elapsed = blk: { | |
const now = std.os.windows.QueryPerformanceCounter(); | |
const a = if (now >= counter) (now - counter) else 0; | |
const b = std.time.ns_per_s; | |
const c = frequency; | |
break :blk (((a / c) * b) + ((a % c) * b) / c); | |
}; | |
if (elapsed > timeout_ns) { | |
return false; | |
} else { | |
const delay_ms = @divFloor(timeout_ns - elapsed, std.time.ns_per_ms); | |
timeout_ms = std.math.cast(std.os.windows.DWORD, delay_ms) catch timeout_ms; | |
} | |
} | |
const status = std.os.windows.kernel32.SleepConditionVariableSRW( | |
&self.cond, | |
&self.lock, | |
timeout_ms, | |
0, | |
); | |
if (status == std.os.windows.FALSE) { | |
switch (std.os.windows.kernel32.GetLastError()) { | |
.TIMEOUT => {}, | |
else => |err| { | |
const e = std.os.windows.unexpectedError(err); | |
unreachable; | |
}, | |
} | |
} | |
} | |
return true; | |
} | |
}; | |
const PosixEvent = struct { | |
mutex: std.c.pthread_mutex_t = std.c.PTHREAD_MUTEX_INITIALIZER, | |
cond: std.c.pthread_cond_t = std.c.PTHREAD_COND_INITIALIZER, | |
is_set: bool = false, | |
const Self = @This(); | |
pub fn deinit(self: *Self) void { | |
const m = std.c.pthread_mutex_destroy(&self.mutex); | |
std.debug.assert(m == 0 or m == std.os.EINVAL); | |
const c = std.c.pthread_cond_destroy(&self.cond); | |
std.debug.assert(c == 0 or c == std.os.EINVAL); | |
self.* = undefined; | |
} | |
pub fn set(self: *Self) void { | |
std.debug.assert(std.c.pthread_mutex_lock(&self.mutex) == 0); | |
defer std.debug.assert(std.c.pthread_mutex_unlock(&self.mutex) == 0); | |
self.is_set = true; | |
std.debug.assert(std.c.pthread_cond_signal(&self.cond) == 0); | |
} | |
pub fn wait(self: *Self, timeout: ?u64) bool { | |
std.debug.assert(std.c.pthread_mutex_lock(&self.mutex) == 0); | |
defer std.debug.assert(std.c.pthread_mutex_unlock(&self.mutex) == 0); | |
var deadline: ?u64 = null; | |
if (timeout) |timeout_ns| { | |
deadline = timestamp(std.os.CLOCK_MONOTONIC) + timeout_ns; | |
} | |
while (!self.is_set) { | |
const deadline_ns = deadline orelse { | |
std.debug.assert(std.c.pthread_cond_wait(&self.cond, &self.mutex) == 0); | |
continue; | |
}; | |
var now_ns = timestamp(std.os.CLOCK_MONOTONIC); | |
if (now_ns >= deadline_ns) { | |
return false; | |
} else { | |
now_ns = timestamp(std.os.CLOCK_REALTIME); | |
now_ns += deadline_ns - now_ns; | |
} | |
var ts: std.os.timespec = undefined; | |
ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(now_ns, std.time.ns_per_s)); | |
ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(now_ns, std.time.ns_per_s)); | |
const rc = std.c.pthread_cond_timedwait(&self.cond, &self.mutex, &ts); | |
std.debug.assert(rc == 0 or rc == std.os.ETIMEDOUT); | |
} | |
return true; | |
} | |
fn timestamp(comptime clock: u32) u64 { | |
if (comptime std.Target.current.isDarwin()) { | |
switch (clock) { | |
std.os.CLOCK_REALTIME => { | |
var tv: std.os.timeval = undefined; | |
std.os.gettimeofday(&tv, null); | |
return (@intCast(u64, tv.tv_sec) * std.time.ns_per_s) + (@intCast(u64, tv.tv_usec) * std.time.ns_per_us); | |
}, | |
std.os.CLOCK_MONOTONIC => { | |
var info: std.os.darwin.mach_timebase_info_data = undefined; | |
std.os.darwin.mach_timebase_info(&info); | |
var counter = std.os.darwin.mach_absolute_time(); | |
if (info.numer > 1) | |
counter *= info.numer; | |
if (info.denom > 1) | |
counter /= info.denom; | |
return counter; | |
}, | |
else => unreachable, | |
} | |
} | |
var ts: std.os.timespec = undefined; | |
std.os.clock_gettime(clock, &ts) catch return 0; | |
return (@intCast(u64, ts.tv_sec) * std.time.ns_per_s) + @intCast(u64, ts.tv_nsec); | |
} | |
}; | |
const LinuxEvent = struct { | |
state: State = .unset, | |
const Self = @This(); | |
const State = enum(i32) { | |
unset = 0, | |
set, | |
}; | |
pub fn deinit(self: *Self) void { | |
self.* = undefined; | |
} | |
pub fn set(self: *Self) void { | |
@atomicStore(State, &self.state, .set, .Release); | |
switch (std.os.linux.getErrno(std.os.linux.futex_wake( | |
@ptrCast(*const i32, &self.state), | |
std.os.linux.FUTEX_PRIVATE_FLAG | std.os.linux.FUTEX_WAKE, | |
1, | |
))) { | |
0 => {}, | |
std.os.EINVAL => {}, | |
std.os.EACCES => {}, | |
std.os.EFAULT => {}, | |
else => unreachable, | |
} | |
} | |
pub fn wait(self: *Self, timeout: ?u64) bool { | |
var deadline: ?u64 = null; | |
while (true) { | |
if (@atomicLoad(State, &self.state, .Acquire) == .set) | |
return true; | |
var ts: std.os.timespec = undefined; | |
var ts_ptr: ?*std.os.timespec = null; | |
if (timeout) |timeout_ns| { | |
const delay_ns = delay: { | |
std.os.clock_gettime(std.os.CLOCK_MONOTONIC, &ts) catch return false; | |
const now_ns = (@intCast(u64, ts.tv_sec) * std.time.ns_per_s) + @intCast(u64, ts.tv_nsec); | |
if (deadline) |deadline_ns| { | |
if (now_ns >= deadline_ns) | |
return false; | |
break :delay (deadline_ns - now_ns); | |
} else { | |
deadline = now_ns + timeout_ns; | |
break :delay timeout_ns; | |
} | |
}; | |
ts_ptr = &ts; | |
ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(delay_ns, std.time.ns_per_s)); | |
ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(delay_ns, std.time.ns_per_s)); | |
} | |
switch (std.os.linux.getErrno(std.os.linux.futex_wait( | |
@ptrCast(*const i32, &self.state), | |
std.os.linux.FUTEX_PRIVATE_FLAG | std.os.linux.FUTEX_WAIT, | |
@enumToInt(State.unset), | |
ts_ptr, | |
))) { | |
0 => {}, | |
std.os.EINTR => {}, | |
std.os.EAGAIN => {}, | |
std.os.ETIMEDOUT => return false, | |
else => unreachable, | |
} | |
} | |
} | |
}; | |
const Thread = if (std.builtin.os.tag == .windows) | |
WindowsThread | |
else if (std.builtin.link_libc) | |
PosixThread | |
else if (std.builtin.os.tag == .linux) | |
LinuxThread | |
else | |
@compileError("Unimplemented Thread primitive for platform"); | |
const WindowsThread = struct { | |
handle: std.os.windows.HANDLE, | |
const Self = @This(); | |
pub fn spawn(stack_size: usize, context: anytype, comptime entryFn: anytype) !Self { | |
const Context = @TypeOf(context); | |
const Wrapper = struct { | |
fn entry(raw_arg: std.os.windows.LPVOID) callconv(.C) std.os.windows.DWORD { | |
entryFn(@ptrCast(Context, @alignCast(@alignOf(Context), raw_arg))); | |
return 0; | |
} | |
}; | |
const handle = std.os.windows.kernel32.CreateThread( | |
null, | |
stack_size, | |
Wrapper.entry, | |
@ptrCast(std.os.windows.LPVOID, context), | |
0, | |
null, | |
) orelse return error.SpawnError; | |
return Self{ .handle = handle }; | |
} | |
pub fn join(self: Self) void { | |
std.os.windows.WaitForSingleObjectEx(self.handle, std.os.windows.INFINITE, false) catch unreachable; | |
std.os.windows.CloseHandle(self.handle); | |
} | |
}; | |
const PosixThread = struct { | |
handle: std.c.pthread_t, | |
const Self = @This(); | |
pub fn spawn(stack_size: usize, context: anytype, comptime entryFn: anytype) !Self { | |
const Context = @TypeOf(context); | |
const Wrapper = struct { | |
fn entry(raw_arg: ?*c_void) callconv(.C) ?*c_void { | |
entryFn(@ptrCast(Context, @alignCast(@alignOf(Context), raw_arg))); | |
return null; | |
} | |
}; | |
var attr: std.c.pthread_attr_t = undefined; | |
if (std.c.pthread_attr_init(&attr) != 0) | |
return error.SystemResources; | |
defer std.debug.assert(std.c.pthread_attr_destroy(&attr) == 0); | |
if (std.c.pthread_attr_setstacksize(&attr, stack_size) != 0) | |
return error.SystemResources; | |
var handle: std.c.pthread_t = undefined; | |
const rc = std.c.pthread_create( | |
&handle, | |
&attr, | |
Wrapper.entry, | |
@ptrCast(?*c_void, context), | |
); | |
return switch (rc) { | |
0 => Self{ .handle = handle }, | |
else => error.SpawnError, | |
}; | |
} | |
pub fn join(self: Self) void { | |
const rc = std.c.pthread_join(self.handle, null); | |
std.debug.assert(rc == 0); | |
} | |
}; | |
const LinuxThread = struct { | |
info: *Info, | |
const Self = @This(); | |
const Info = struct { | |
mmap_ptr: usize, | |
mmap_len: usize, | |
context: usize, | |
handle: i32, | |
}; | |
pub fn spawn(stack_size: usize, context: anytype, comptime entryFn: anytype) !Self { | |
var mmap_size: usize = std.mem.page_size; | |
const guard_end = mmap_size; | |
mmap_size = std.mem.alignForward(mmap_size + stack_size, std.mem.page_size); | |
const stack_end = mmap_size; | |
mmap_size = std.mem.alignForward(mmap_size, @alignOf(Info)); | |
const info_begin = mmap_size; | |
mmap_size = std.mem.alignForward(mmap_size + @sizeOf(Info), std.os.linux.tls.tls_image.alloc_align); | |
const tls_begin = mmap_size; | |
mmap_size = std.mem.alignForward(mmap_size + std.os.linux.tls.tls_image.alloc_size, std.mem.page_size); | |
const mmap_bytes = try std.os.mmap( | |
null, | |
mmap_size, | |
std.os.PROT_NONE, | |
std.os.MAP_PRIVATE | std.os.MAP_ANONYMOUS, | |
-1, | |
0, | |
); | |
errdefer std.os.munmap(mmap_bytes); | |
try std.os.mprotect( | |
mmap_bytes[guard_end..], | |
std.os.PROT_READ | std.os.PROT_WRITE, | |
); | |
const info = @ptrCast(*Info, @alignCast(@alignOf(Info), &mmap_bytes[info_begin])); | |
info.* = .{ | |
.mmap_ptr = @ptrToInt(mmap_bytes.ptr), | |
.mmap_len = mmap_bytes.len, | |
.context = @ptrToInt(context), | |
.handle = undefined, | |
}; | |
var user_desc: switch (std.builtin.arch) { | |
.i386 => std.os.linux.user_desc, | |
else => void, | |
} = undefined; | |
var tls_ptr = std.os.linux.tls.prepareTLS(mmap_bytes[tls_begin..]); | |
if (std.builtin.arch == .i386) { | |
defer tls_ptr = @ptrToInt(&user_desc); | |
user_desc = .{ | |
.entry_number = std.os.linux.tls.tls_image.gdt_entry_number, | |
.base_addr = tls_ptr, | |
.limit = 0xfffff, | |
.seg_32bit = 1, | |
.contents = 0, | |
.read_exec_only = 0, | |
.limit_in_pages = 1, | |
.seg_not_present = 0, | |
.useable = 1, | |
}; | |
} | |
const flags: u32 = | |
std.os.CLONE_SIGHAND | std.os.CLONE_SYSVSEM | | |
std.os.CLONE_VM | std.os.CLONE_FS | std.os.CLONE_FILES | | |
std.os.CLONE_PARENT_SETTID | std.os.CLONE_CHILD_CLEARTID | | |
std.os.CLONE_THREAD | std.os.CLONE_DETACHED | std.os.CLONE_SETTLS; | |
const Context = @TypeOf(context); | |
const Wrapper = struct { | |
fn entry(raw_arg: usize) callconv(.C) u8 { | |
const info_ptr = @intToPtr(*Info, raw_arg); | |
entryFn(@intToPtr(Context, info_ptr.context)); | |
return 0; | |
} | |
}; | |
const rc = std.os.linux.clone( | |
Wrapper.entry, | |
@ptrToInt(&mmap_bytes[stack_end]), | |
flags, | |
@ptrToInt(info), | |
&info.handle, | |
tls_ptr, | |
&info.handle, | |
); | |
return switch (std.os.linux.getErrno(rc)) { | |
0 => Self{ .info = info }, | |
else => error.SpawnError, | |
}; | |
} | |
pub fn join(self: Self) void { | |
while (true) { | |
const tid = @atomicLoad(i32, &self.info.handle, .SeqCst); | |
if (tid == 0) { | |
std.os.munmap(@intToPtr([*]align(std.mem.page_size) u8, self.info.mmap_ptr)[0..self.info.mmap_len]); | |
return; | |
} | |
const rc = std.os.linux.futex_wait(&self.info.handle, std.os.linux.FUTEX_WAIT, tid, null); | |
switch (std.os.linux.getErrno(rc)) { | |
0 => continue, | |
std.os.EINTR => continue, | |
std.os.EAGAIN => continue, | |
else => unreachable, | |
} | |
} | |
} | |
}; | |
pub fn main() !void { | |
const Fib = struct { | |
fn call(allocator: *std.mem.Allocator, n: usize) anyerror!void { | |
if (n <= 1) | |
return; | |
suspend { | |
var task = Loop.Task.init(@frame()); | |
Loop.schedule(&task, .High); | |
} | |
const l = try allocator.create(@Frame(@This().call)); | |
defer allocator.destroy(l); | |
const r = try allocator.create(@Frame(@This().call)); | |
defer allocator.destroy(r); | |
l.* = async @This().call(allocator, n - 1); | |
r.* = async @This().call(allocator, n - 2); | |
const lv = await l; | |
const rv = await r; | |
try lv; | |
try rv; | |
} | |
}; | |
var gpa = std.heap.GeneralPurposeAllocator(.{}){}; | |
var win_heap = if (std.builtin.os.tag == .windows) std.heap.HeapAllocator.init() else {}; | |
const allocator = if (std.builtin.link_libc) | |
std.heap.c_allocator | |
else if (std.builtin.os.tag == .windows) | |
&win_heap.allocator | |
else | |
&gpa.allocator; | |
try (try Loop.run(.{}, Fib.call, .{allocator, 35})); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const Scheduler = @This(); | |
counter: usize = 0, | |
idling: ?*Worker = null, | |
spawned: ?*Worker = null, | |
run_queues: [3]SchedulerQueue = [_]SchedulerQueue{.{}} ** 3, | |
pub const Platform = struct { | |
callFn: fn(*Platform, Action) void, | |
pub fn call(self: *Platform, action: Action) void { | |
return (self.callFn)(self, action); | |
} | |
pub const Event = union(enum) { | |
OnPoll: struct { | |
worker: *Worker, | |
}, | |
OnExecute: struct { | |
worker: *Worker, | |
task: *Task, | |
}, | |
}; | |
pub const Action = union(enum) { | |
HandleEvent: Event, | |
AcquireLock: struct { | |
scheduler: *Scheduler, | |
}, | |
ReleaseLock: struct { | |
scheduler: *Scheduler, | |
}, | |
SpawnWorker: struct { | |
scheduler: *Scheduler, | |
succeeded: *bool, | |
}, | |
SyncWorker: struct { | |
worker: *Worker, | |
}, | |
JoinWorker: struct { | |
worker: *Worker, | |
}. | |
SuspendWorker: struct { | |
worker: *Worker, | |
}, | |
ResumeWorker: struct { | |
worker: *Worker, | |
}, | |
PollWorker: struct { | |
worker: *Worker, | |
result: *?*Task, | |
}, | |
PollScheduler: struct { | |
worker: *Worker, | |
result: *?*Task, | |
}, | |
}; | |
}; | |
pub const Config = struct { | |
max_workers: usize, | |
platform: *Platform, | |
}; | |
pub fn init(config: Config) Scheduler { | |
} | |
pub fn shutdown(self: *Scheduler) void { | |
} | |
pub fn join(self: *Scheduler) void { | |
} | |
pub const Task = struct { | |
next: ?*Task = undefined, | |
callback: fn(*Worker, *Task) void, | |
pub const Priority = enum { | |
Low = 0, | |
Normal = 1, | |
High = 2, | |
}; | |
pub const Group = struct { | |
len: usize = 0, | |
head: *Task = undefined, | |
tail: *Task = undefined, | |
pub fn from(task: *Task) Group { | |
task.next = null; | |
return .{ | |
.len = 1, | |
.head = task, | |
.tail = task, | |
}; | |
} | |
pub fn push(self: *Group, group: Group) void { | |
if (self.len == 0) { | |
self.* = group; | |
} else if (group.len != 0) { | |
self.tail.next = group.head; | |
self.tail = group.tail; | |
self.len += group.len; | |
} | |
} | |
pub fn pop(self: *Group) ?*Task { | |
if (self.len == 0) return null; | |
const task = self.head; | |
self.head = task.next orelse undefined; | |
self.len -= 1; | |
return task; | |
} | |
}; | |
}; | |
pub const Schedule = struct { | |
group: Task.Group, | |
affinity: ?*Worker = null, | |
priority: Task.Priority = .Normal, | |
}; | |
pub fn schedule(self: *Scheduler, sched: Schedule) void { | |
} | |
pub const Worker = struct { | |
scheduler: *Scheduler, | |
run_queues: [3]RunQueue = [_]RunQueue{.{}} ** 3, | |
pub fn run(self: *Worker, scheduler: *Scheduler) void { | |
} | |
pub fn schedule(self: *Worker, sched: Schedule) void { | |
} | |
}; | |
const RunQueue = struct { | |
local: LifoStack = .{}, | |
overflow: FifoQueue = .{}, | |
buffer: LifoBuffer = .{}, | |
fn push(self: *RunQueue, group: Task.Group, is_local: bool) void { | |
if (group.len == 0) { | |
return; | |
} | |
if (is_local) { | |
self.local.push(group); | |
return; | |
} | |
if (self.buffer.push(group)) |overflowed| { | |
self.overflow.push(overflowed); | |
} | |
} | |
const Pop = struct { | |
task: ?*Task, | |
pushed: bool = false, | |
}; | |
fn pop(self: *RunQueue, be_fair: bool, is_local: bool) ?Pop { | |
if (is_local) { | |
return self.local.pop(); | |
} | |
if (be_fair) { | |
if (self.stealFifo(&self.overflow)) |pop| { | |
return pop; | |
} | |
} | |
if (self.buffer.pop()) |task| { | |
return Pop{ .task = task }; | |
} | |
return self.stealFifo(&self.overflow); | |
} | |
fn steal(self: *RunQueue, target: *RunQueue, be_fair: bool) ?Pop { | |
if (self == target) { | |
returns elf.pop(be_fair, false); | |
} | |
if (be_fair) { | |
if (self.stealFifo(&target.overflow)) |pop| { | |
return pop; | |
} | |
} | |
if (target.buffer.steal()) |task| { | |
return Pop{ .task = task }; | |
} | |
return self.stealFifo(&target.overflow); | |
} | |
fn stealFifo(self: *RunQueue, queue: *FifoQueue) ?Pop { | |
var consumer = queue.tryAcquireConsumer() orelse return null; | |
defer consumer.release(); | |
const task = consumer.pop() orelse return null; | |
const pushed = self.buffer.consume(&consumer); | |
return Pop{ | |
.task = task, | |
.pushed = pushed > 0, | |
}; | |
} | |
}; | |
/// Unbounded MPSC stack of Tasks with amortized synchronization. | |
const LifoStack = struct { | |
stack: ?*Task = null, | |
local: ?*Task = null, | |
fn push(self: *LifoStack, group: Task.Group) void { | |
std.debug.assert(group.len > 0); | |
// Add the linked group of tasks to the stack using a CAS loop. | |
// Release barrier on success so pop() can see the linked tasks with Acquire. | |
var stack = @atomicLoad(?*Task, &self.stack, .Monotonic); | |
while (true) : (std.Thread.spinLoopHint()) { | |
group.tail.next = stack; | |
stack = @cmpxchgWeak( | |
?*Task, | |
&self.stack, | |
stack, | |
group.head, | |
.Release, | |
.Monotonic, | |
) orelse break; | |
} | |
} | |
fn pop(self: *LifoStack) ?*Task { | |
if (self.local) |task| { | |
self.local = task.next; | |
return task; | |
} | |
// If theres tasks in the shared stack, grab all of them. | |
// Acquire barrier in order to see the .next links made by the Release thread above. | |
var stack = @atomicLoad(?*Task, &self.stack, .Monotonic); | |
if (stack != null) { | |
stack = @atomicRmw(?*Task, &self.stack, .Xchg, null, .Acquire); | |
} | |
const task = stack orelse return null; | |
self.local = task.next; | |
return task; | |
} | |
}; | |
/// Modified version of MPSC linked below to be non-blocking MPMC. | |
/// http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue. | |
const FifoQueue = struct { | |
head: usize = 0, | |
tail: ?*Task = null, | |
stub: Task = Task{ | |
.next = null, | |
.callback = undefined, | |
}, | |
fn isEmpty(self: *const FifoQueue) bool { | |
const tail = @atomicLoad(?*Task, &self.tail, .Monotonic); | |
return (tail == null) or (tail == &self.stub); | |
} | |
fn push(self: *FifoQueue, group: Task.Group) void { | |
std.debug.assert(group.len > 0); | |
std.debug.assert(group.tail.next == null); | |
// Swap the tail with our group's tail in order to insert the group into the queue. | |
// Acquire barrier to prevent the store below from being reordered before updating queue.tail. | |
// Relase barrier to publish all the group.tail writes we've done to the consumer thread. | |
const tail = @atomicRmw(?*Task, &self.tail, .Xchg, group.tail, .AcqRel); | |
// We don't set the tail to point to the stub on init like the reference impl. | |
// Instead we use null as a sentinel to mean "replace with stub" | |
const prev = tail orelse &self.stub; | |
// Finally, we correctly join the group into the queue. | |
// Release barrier for the load(&prev.next, .Acquire) thread to see the full group links. | |
// | |
// Between here and the swap() above, the queue could be incorrectly observed to be empty. | |
// In the consumer, we just return null since a push to a queue will be followed by notifying a worker | |
// which will handle any observations that resulted in missing the full group-append. | |
@atomicStore(?*Task, &prev.next, group.head, .Release); | |
} | |
fn tryAcquireConsumer(self: *FifoQueue) ?Consumer { | |
// Its faster to test if the queue is empty before trying to acquire the consumer. | |
// Appears to decrease the cost of checking FifoQueues with all the polling. | |
if (self.isEmpty()) { | |
return null; | |
} else { | |
return self.tryAcquireConsumerSlow(); | |
} | |
} | |
const IS_CONSUMING = 0b1; | |
fn tryAcquireConsumerSlow(self: *FifoQueue) ?Consumer { | |
@setCold(true); | |
while (true) { | |
// Bail if theres already a consumer. This provides exclusive access to the MPSC. | |
const head = @atomicLoad(usize, &self.head, .Monotonic); | |
if (head & IS_CONSUMING != 0) { | |
return null; | |
} | |
// Acquire barrier on success in order to see any Task writes done by previous consumer. | |
_ = @cmpxchgWeak( | |
usize, | |
&self.head, | |
head, | |
head | IS_CONSUMING, | |
.Acquire, | |
.Monotonic, | |
) orelse return Consumer{ | |
.queue = self, | |
.stub = &self.stub, | |
.head = @intToPtr(?*Task, head) orelse &self.stub, | |
}; | |
std.Thread.spinLoopHint(); | |
if (self.isEmpty()) { | |
return null; | |
} | |
} | |
} | |
const Consumer = struct { | |
queue: *FifoQueue, | |
stub: ?*Task, | |
head: ?*Task, | |
fn release(self: Consumer) void { | |
// Update the queue head and unset the IS_CONSUMING bit at the same time. | |
// Release barrier for next consumer to see any stub/head writes this thread has done. | |
const new_head = @ptrToInt(self.head); | |
@atomicStore(usize, &self.queue.head, new_head, .Release); | |
} | |
fn pop(self: *Consumer) ?*Task { | |
// Skip the stub node if its there. | |
// Acquire barrier to see the linked updates done by push()'s Release. | |
var head = self.head; | |
if (head == self.stub) { | |
head = @atomicLoad(?*Task, &head.next, .Acquire) orelse return null; | |
self.head = head; | |
} | |
// If theres a following node, then we can safely pop the current head. | |
// Acquire barrier to see the linked updates done by push()'s Release. | |
if (@atomicLoad(?*Task, &head.next, .Acquire)) |next| { | |
self.head = next; | |
return head; | |
} | |
// Check if the producer is in the middle of attaching its Group to the queue. | |
// If so, bail for now and try again later when the producer eventually wakes us up. | |
const tail = @atomicLoad(?*Task, &self.queue.tail, .Monotonic); | |
if (tail != head) { | |
return null; | |
} | |
// In order for us to pop the current head, we need it to not be the last one. | |
// So we push the stub after it to keep this invariant. | |
self.queue.push(Task.Group.from(self.stub)); | |
// Finally, try to pop the current head again (See above) | |
const next = @atomicLoad(?*Task, &head.next, .Acquire) orelse return null; | |
self.head = next; | |
return head; | |
} | |
}; | |
}; | |
/// Slightly modified version of Chase-Lev SPMC dequeue implementation. | |
/// https://fzn.fr/readings/ppopp13.pdf | |
const LifoBuffer = struct { | |
head: Index = 0, | |
tail: Index = 0, | |
buffer: [capacity]*Task = undefined, | |
const Index = u8; | |
const capacity = 128; | |
comptime { | |
std.debug.assert(capacity <= std.math.maxInt(Index)); | |
} | |
/// Try to push the group of tasks to the bounded ring buffer. | |
/// If unable to, then return a group of tasks which overflowed. | |
fn push(self: *LifoBuffer, group: Task.Group) ?Task.Group { | |
std.debug.assert(group.len > 0); | |
if (group.len >= capacity) { | |
return group; | |
} | |
var tasks = group; | |
var tail = self.tail; // we're the only producer | |
var head = @atomicLoad(Index, &self.head, .Monotonic); | |
while (true) { | |
const size = tail -% head; | |
// Write the tasks into the ring buffer if theres space for them all | |
const free_slots = capacity - size; | |
if (tasks.len <= free_slots) { | |
// Use atomic stores to avoid a data-race with a steal()'er trying to read a wrapped-around buffer slot. | |
// Use Unordered since the store only needs to be atomic and not ordered with anything else. | |
while (tasks.pop()) |task| { | |
@atomicStore(*Task, &self.buffer[tail % capacity], task, .Unordered); | |
tail +%= 1; | |
} | |
// Release store to the tail so steal()'s loading from it with Acquire see the buffer writes. | |
@atomicStore(Index, &self.tail, tail, .Release); | |
return null; | |
} | |
// If the ring buffer isn't at least half full, | |
// then its better to not pay the cost of migrating tasks out of the buffer below. | |
const half = capacity / 2; | |
if (free_slots < half) { | |
return tasks; | |
} | |
// The buffer is at least half full and writing tasks from our group would overflow it. | |
// To amortize the cost of returning overflowed Groups in the future, we make room in the ring buffer. | |
// | |
// Try to effectively steal half of the tasks in the ring buffer so that we can report them as overflowed. | |
// Acquire barrier on success to ensure that the writes to the Tasks in the buffer below don't happen until we mark them as stolen. | |
const new_head = head +% (size / 2); | |
if (@cmpxchgWeak(Index, &self.head, head, new_head, .Acquire, .Monotonic)) |updated| { | |
head = updated; | |
continue; | |
} | |
var overflowed = Task.Group{}; | |
while (head != new_head) : (head +%= 1) { | |
const task = self.buffer[head % capacity]; | |
overflowed.push(Task.Group.from(task)); | |
} | |
overflowed.push(tasks); | |
return overflowed; | |
} | |
} | |
fn consume(self: *LifoBuffer, consumer: *FifoQueue.Consumer) Index { | |
var tail = self.tail; // we're the only producer | |
var head = @atomicLoad(Index, &self.head, .Monotonic); | |
const free_slots = capacity - (tail -% head); | |
// See push() for reasoning behind atomic Unordered stores to buffer. | |
var pushed: Index = 0; | |
while (pushed < free_slots) : (pushed += 1) { | |
const task = consumer.pop() orelse break; | |
const index = (tail +% pushed) % capacity; | |
@atomicStore(*Task, &self.buffer[index], task, .Unordered); | |
} | |
// See push() for reasoning behind Release store to tail on buffer write. | |
if (pushed > 0) { | |
@atomicStore(Index, &self.tail, tail +% pushed, .Release); | |
} | |
return pushed; | |
} | |
fn steal(self: *LifoBuffer) ?*Task { | |
// Load the head before loading the tail in order to ensure it sees a correct view of the buffer size. | |
// Acquire barrier to keep the load of the tail below from happening before the head load. | |
// SeqCst to have a total ordering relationship with pop() for observing head before tail. | |
var head = @atomicLoad(Index, &self.head, .SeqCst); | |
while (true) { | |
// Acquire barrier to see the Release'd buffer writes done in push() and consume(). | |
// SeqCst barrier to have a total ordering relationship with pop() for observing tail after head. | |
const tail = @atomicLoad(Index, &self.tail, .SeqCst); | |
// Handle both cases when the buffer is empty and when pop() below update the tail while its empty. | |
if ((tail == head) or ((tail -% 1) == head)) { | |
return null; | |
} | |
// Load the task from the buffer atomically to avoid data-race with the producer wrapping around and writing. | |
// Must happen after Acquiring the tail in order to see the task written by the producer. | |
const task = @atomicLoad(*Task, &self.buffer[head % capacity], .Unordered); | |
// Update the head to try and mark the task as stolen. | |
// | |
// Acquire barrier on success to keep any task updates after stealing being reordered before the actual steal. | |
// Release barrier on success to keep the task load from the buffer from happened after the steal since it could be overwritten by then. | |
// SeqCst barrier on success to have a total ordering relationship with pop() for observing head before tail. | |
// | |
// Acquire barrier on failure so that the load of the head happens before the tail above in program order. | |
head = @cmpxchgStrong(Index, &self.head, head, head +% 1, .SeqCst, .Acquire) orelse { | |
std.Thread.spinLoopHint(); | |
return task; | |
}; | |
} | |
} | |
fn pop(self: *LifoBuffer) ?*Task { | |
// we're the only producer | |
const tail = self.tail; | |
// Quick preemptive check if the queue is empty to avoid the SeqCst atomics below. | |
if (@atomicLoad(Index, &self.head, .Monotonic) == tail) { | |
return null; | |
} | |
// Update the tail to preemptively mark its slot as consumed, **then** observe the head. | |
// The head must be observed after the tail update unless steals() could take Tasks without us noticing. | |
// SeqCst barrier is needed on the atomics to provide a Store-Load reordering constraint & total ordering with steal() loads/updates. | |
const new_tail = tail -% 1; | |
@atomicStore(Index, &self.tail, new_tail, .SeqCst); | |
const head = @atomicLoad(Index, &self.head, .SeqCst); | |
var task: ?*Task = null; | |
if (head != tail) { | |
task = self.buffer[new_tail % capacity]; | |
// If this isn't the last task, then we've already marked it stolen so we return it with ownership. | |
if (head != new_tail) { | |
return task; | |
} | |
// This is the last task in the buffer so we need to race with a steal() thread in claiming it. | |
// SeqCst on success to have a total ordering relationship with the loads/CAS from the steal() thread. | |
if (@cmpxchgStrong(Index, &self.head, head, tail, .SeqCst, .Monotonic) != null) { | |
task = null; | |
} | |
} | |
// We failed to steal a task, so we need to restore the tail we updated with SeqCst above. | |
@atomicStore(Index, &self.tail, tail, .Monotonic); | |
return task; | |
} | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment