Last active
March 20, 2024 16:11
-
-
Save kprotty/77b248a12f5416d131ac375d59b9fa40 to your computer and use it in GitHub Desktop.
One-file event loop
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const Allocator = std.mem.Allocator; | |
pub const Loop = struct { | |
io_poller: IoPoller, | |
allocator: *Allocator, | |
is_running: bool = true, | |
is_notifying: bool = false, | |
run_queue: RunQueue = .{}, | |
spawned_threads: usize = 0, | |
threads: []*Platform.Thread = &[_]*Platform.Thread{}, | |
const Self = @This(); | |
const IoPoller = Io.Backend.Poller; | |
var global_instance: Self = undefined; | |
pub const instance: ?*Self = switch (true) { | |
true => &global_instance, | |
else => null, | |
}; | |
pub fn init(self: *Self, allocator: *Allocator) !void { | |
self.* = .{ | |
.io_poller = undefined, | |
.allocator = allocator, | |
}; | |
try self.io_poller.init(); | |
errdefer self.io_poller.deinit(); | |
var extra_threads: usize = 0; | |
if (!std.builtin.single_threaded) | |
extra_threads = (Platform.Thread.cpuCount() catch 1) - 1; | |
self.threads = try self.allocator.alloc(*Platform.Thread, extra_threads); | |
errdefer self.deinit(); | |
for (self.threads) |*thread| { | |
thread.* = try Platform.Thread.spawn(Self.run, self); | |
self.spawned_threads += 1; | |
} | |
} | |
pub fn deinit(self: *Self) void { | |
self.shutdown(); | |
for (self.threads[0..self.spawned_threads]) |thread| | |
thread.wait(); | |
self.allocator.free(self.threads); | |
self.io_poller.deinit(); | |
self.* = undefined; | |
} | |
pub fn shutdown(self: *Self) void { | |
@atomicStore(bool, &self.is_running, false, .SeqCst); | |
self.notify(); | |
} | |
pub const Runnable = struct { | |
next: ?*Runnable = undefined, | |
frame: usize, // stored as usize instead of anyframe to support being printed | |
pub fn init(frame: anyframe) Runnable { | |
return .{ .frame = @ptrToInt(frame) }; | |
} | |
pub fn run(self: *Runnable) void { | |
resume @intToPtr(anyframe, self.frame); | |
} | |
}; | |
pub const Batch = struct { | |
head: ?*Runnable = null, | |
tail: *Runnable = undefined, | |
pub fn from(runnable: *Runnable) Batch { | |
runnable.next = null; | |
return Batch{ | |
.head = runnable, | |
.tail = runnable, | |
}; | |
} | |
pub fn isEmpty(self: Batch) bool { | |
return self.head == null; | |
} | |
pub fn push(self: *Batch, batch: Batch) void { | |
if (self.isEmpty()) { | |
self.* = batch; | |
} else if (!batch.isEmpty()) { | |
self.tail.next = batch.head; | |
self.tail = batch.tail; | |
} | |
} | |
pub fn pop(self: *Batch) ?*Runnable { | |
const runnable = self.head orelse return null; | |
self.head = runnable.next; | |
return runnable; | |
} | |
pub fn iter(self: Batch) Iter { | |
return .{ .runnable = self.head }; | |
} | |
pub const Iter = struct { | |
runnable: ?*Runnable = null, | |
pub fn next(self: *Iter) ?*Runnable { | |
const runnable = self.runnable orelse return null; | |
self.runnable = runnable.next; | |
return runnable; | |
} | |
}; | |
}; | |
const RunQueue = struct { | |
lock: Platform.Lock = .{}, | |
batch: Batch = .{}, | |
has_pending: bool = false, | |
fn push(self: *RunQueue, batch: Batch) void { | |
if (batch.isEmpty()) | |
return; | |
const held = self.lock.acquire(); | |
defer held.release(); | |
self.batch.push(batch); | |
@atomicStore(bool, &self.has_pending, true, .SeqCst); | |
} | |
fn pop(self: *RunQueue) ?*Runnable { | |
if (@atomicLoad(bool, &self.has_pending, .SeqCst) == false) | |
return null; | |
const held = self.lock.acquire(); | |
defer held.release(); | |
return self.batch.pop() orelse { | |
@atomicStore(bool, &self.has_pending, false, .SeqCst); | |
return null; | |
}; | |
} | |
}; | |
pub fn schedule(self: *Self, batch: Batch) void { | |
self.run_queue.push(batch); | |
self.notify(); | |
} | |
pub fn yield(self: *Self) void { | |
suspend { | |
var runnable = Runnable.init(@frame()); | |
self.schedule(Batch.from(&runnable)); | |
} | |
} | |
pub fn run(self: *Self) void { | |
var was_waiting = false; | |
while (true) { | |
if (self.run_queue.pop()) |runnable| { | |
if (was_waiting) | |
self.notify(); | |
was_waiting = false; | |
runnable.run(); | |
continue; | |
} | |
if (@atomicLoad(bool, &self.is_running, .SeqCst)) { | |
self.wait(); | |
was_waiting = true; | |
continue; | |
} | |
self.notify(); | |
return; | |
} | |
} | |
fn notify(self: *Self) void { | |
if (@atomicLoad(bool, &self.is_notifying, .SeqCst)) | |
return; | |
if (@atomicRmw(bool, &self.is_notifying, .Xchg, true, .SeqCst)) | |
return; | |
self.io_poller.notify(); | |
} | |
fn wait(self: *Self) void { | |
var events: IoPoller.Events = undefined; | |
self.io_poller.poll(&events, null); | |
var batch = Batch{}; | |
defer self.run_queue.push(batch); | |
while (true) { | |
const maybe_runnable = events.next() catch { | |
@atomicStore(bool, &self.is_notifying, false, .SeqCst); | |
continue; | |
}; | |
const runnable = maybe_runnable orelse break; | |
batch.push(Batch.from(runnable)); | |
} | |
} | |
}; | |
pub const sync = struct { | |
pub const Thread = struct { | |
fn ReturnTypeOf(comptime asyncFn: anytype) type { | |
return @typeInfo(@TypeOf(asyncFn)).Fn.return_type.?; | |
} | |
pub fn Handle(comptime T: type) type { | |
return struct { | |
frame: anyframe->T, | |
freeFn: fn(*@This()) void, | |
// TODO: must use (await (async self.join())) unless segfault | |
pub fn join(self: *@This()) T { | |
const result = await self.frame; | |
(self.freeFn)(self); | |
return result; | |
} | |
}; | |
} | |
pub fn spawn(comptime asyncFn: anytype, args: anytype) !*Handle(ReturnTypeOf(asyncFn)) { | |
const loop = Loop.instance orelse { | |
@compileError("Evented IO is not enabled. Use `pub const io_mode = .evented` in root file"); | |
}; | |
const Args = @TypeOf(args); | |
const AsyncHandle = struct { | |
frame: @Frame(@This().run), | |
handle: Handle(ReturnTypeOf(asyncFn)), | |
fn free(handle: *Handle(ReturnTypeOf(asyncFn))) void { | |
const self = @fieldParentPtr(@This(), "handle", handle); | |
loop.allocator.destroy(self); | |
} | |
fn run(fn_args: Args) ReturnTypeOf(asyncFn) { | |
loop.yield(); | |
return @call(.{}, asyncFn, fn_args); | |
} | |
}; | |
const async_handle = try loop.allocator.create(AsyncHandle); | |
(&async_handle.frame).* = async AsyncHandle.run(args); | |
async_handle.handle.freeFn = AsyncHandle.free; | |
async_handle.handle.frame = &async_handle.frame; | |
return &async_handle.handle; | |
} | |
}; | |
pub const Condvar = struct { | |
state: usize = 0, | |
semaphore: Semaphore = Semaphore.init(0), | |
const NOTIFIED: usize = 1 << 0; | |
const WAITING: usize = 1 << 1; | |
pub fn wait(self: *Condvar, held: Mutex.Held) void { | |
held.release(); | |
defer _ = held.mutex.acquire(); | |
// NOTIFIED -> 0: consumed notification | |
// x -> x + WAITING: wait on semaphore | |
var state = @atomicLoad(usize, &self.state, .SeqCst); | |
while (true) { | |
state = @cmpxchgWeak( | |
usize, | |
&self.state, | |
state, | |
if (state == NOTIFIED) 0 else state + WAITING, | |
.SeqCst, | |
.SeqCst, | |
) orelse break; | |
} | |
if (state & NOTIFIED == 0) | |
self.semaphore.wait(); | |
} | |
pub fn signal(self: *Condvar) void { | |
// NOTIFIED: nothing left to do | |
// 0 -> NOTIFIED: left a notification | |
// x -> x - WAITING: post to semaphore | |
var state = @atomicLoad(usize, &self.state, .SeqCst); | |
while (true) { | |
if (state == NOTIFIED) | |
return; | |
state = @cmpxchgWeak( | |
usize, | |
&self.state, | |
state, | |
if (state == 0) NOTIFIED else state - WAITING, | |
.SeqCst, | |
.SeqCst, | |
) orelse break; | |
} | |
if (state != 0) | |
self.semaphore.post(); | |
} | |
pub fn broadcast(self: *Condvar) void { | |
// NOTIFIED: nothing left to do | |
// 0 -> NOTIFIED: left a notification | |
// x -> 0: post to semaphore for every WAITER in x | |
var state = @atomicLoad(usize, &self.state, .SeqCst); | |
while (true) { | |
if (state == NOTIFIED) | |
return; | |
state = @cmpxchgWeak( | |
usize, | |
&self.state, | |
state, | |
if (state == 0) NOTIFIED else 0, | |
.SeqCst, | |
.SeqCst, | |
) orelse break; | |
} | |
var waiters = state >> 1; | |
while (waiters > 0) : (waiters -= 1) | |
self.semaphore.post(); | |
} | |
}; | |
pub const Mutex = struct { | |
semaphore: Semaphore = Semaphore.init(1), | |
pub fn tryAcquire(self: *Mutex) ?Held { | |
if (self.semaphore.tryWait()) | |
return Held{ .mutex = self }; | |
return null; | |
} | |
pub fn acquire(self: *Mutex) Held { | |
self.semaphore.wait(); | |
return Held{ .mutex = self }; | |
} | |
pub const Held = struct { | |
mutex: *Mutex, | |
pub fn release(self: Held) void { | |
self.mutex.semaphore.post(); | |
} | |
}; | |
}; | |
pub const Semaphore = struct { | |
channel: Channel(void), | |
pub fn init(permits: usize) Semaphore { | |
return Semaphore{ | |
.channel = Channel(void){ | |
.head = 0, | |
.tail = permits, | |
.buffer = {}, | |
.capacity = std.math.maxInt(usize), | |
}, | |
}; | |
} | |
pub fn tryWait(self: *Semaphore) bool { | |
return self.channel.tryGet() != null; | |
} | |
pub fn wait(self: *Semaphore) void { | |
_ = self.channel.get(); | |
} | |
pub fn post(self: *Semaphore) void { | |
std.debug.assert(self.channel.tryPut(undefined)); | |
} | |
}; | |
pub fn Channel(comptime T: type) type { | |
return struct { | |
lock: Platform.Lock = .{}, | |
head: usize = 0, | |
tail: usize = 0, | |
putters: Waiters = .{}, | |
getters: Waiters = .{}, | |
buffer: if (@sizeOf(T) > 0) [*]T else void, | |
capacity: usize, | |
const Self = @This(); | |
const Mode = enum { Blocking, NonBlocking }; | |
const Waiters = std.SinglyLinkedList(struct { | |
item: T, | |
runnable: Loop.Runnable, | |
}); | |
pub fn tryPut(self: *Self, item: T) bool { | |
return self.putUsing(item, .NonBlocking) != null; | |
} | |
pub fn put(self: *Self, item: T) void { | |
std.debug.assert(self.putUsing(item, .Blocking) != null); | |
} | |
fn putUsing(self: *Self, item: T, comptime mode: Mode) ?void { | |
const held = self.lock.acquire(); | |
if (self.notify(&self.getters, held, item)) |_| { | |
return; | |
} | |
if (self.tail -% self.head < self.capacity) { | |
if (@sizeOf(T) > 0) | |
self.buffer[self.tail % self.capacity] = item; | |
self.tail +%= 1; | |
held.release(); | |
return; | |
} | |
if (mode == .NonBlocking) { | |
held.release(); | |
return null; | |
} | |
return self.wait(&self.putters, held, item); | |
} | |
pub fn tryGet(self: *Self) ?T { | |
return self.getUsing(.NonBlocking); | |
} | |
pub fn get(self: *Self) T { | |
return self.getUsing(.Blocking) orelse unreachable; | |
} | |
fn getUsing(self: *Self, comptime mode: Mode) ?T { | |
const held = self.lock.acquire(); | |
if (self.notify(&self.putters, held, null)) |item| | |
return item; | |
if (self.tail != self.head) { | |
var item: T = undefined; | |
if (@sizeOf(T) > 0) | |
item = self.buffer[self.head % self.capacity]; | |
self.head +%= 1; | |
held.release(); | |
return item; | |
} | |
if (mode == .NonBlocking) { | |
held.release(); | |
return null; | |
} | |
return self.wait(&self.getters, held, null); | |
} | |
fn wait(self: *Self, waiters: *Waiters, held: Platform.Lock.Held, write_item: ?T) T { | |
var waiter: Waiters.Node = undefined; | |
if (write_item) |write| | |
waiter.data.item = write; | |
waiter.data.runnable = Loop.Runnable.init(@frame()); | |
waiters.prepend(&waiter); | |
suspend { | |
held.release(); | |
} | |
return waiter.data.item; | |
} | |
fn notify(self: *Self, waiters: *Waiters, held: Platform.Lock.Held, write_item: ?T) ?T { | |
const waiter = waiters.popFirst() orelse return null; | |
const item = waiter.data.item; | |
if (write_item) |write| | |
waiter.data.item = write; | |
const loop = Loop.instance orelse { | |
@compileError("Evented IO is not enabled. Use `pub const io_mode = .evented` in root file"); | |
}; | |
loop.schedule(Loop.Batch.from(&waiter.data.runnable)); | |
held.release(); | |
return item; | |
} | |
}; | |
} | |
}; | |
pub const Io = struct { | |
pub const Backend = switch (std.builtin.os.tag) { | |
.linux => LinuxBackend, | |
.windows => @compileError("TODO"), | |
.macos, .freebsd, .openbsd, .netbsd, .dragonfly => @compileError("TODO"), | |
else => @compileError("Platform not supported for IO"), | |
}; | |
const LinuxBackend = PosixBackend(struct { | |
epoll_fd: std.os.fd_t, | |
notify_fd: std.os.fd_t, | |
registered_notify_fd: bool, | |
const Self = @This(); | |
pub fn init(self: *Self) !void { | |
self.epoll_fd = try std.os.epoll_create1(std.os.EPOLL_CLOEXEC); | |
errdefer std.os.close(self.epoll_fd); | |
self.notify_fd = try std.os.eventfd(0, std.os.EFD_CLOEXEC | std.os.EFD_NONBLOCK); | |
errdefer std.os.close(self.notify_fd); | |
self.registered_notify_fd = false; | |
} | |
pub fn deinit(self: *Self) void { | |
std.os.close(self.notify_fd); | |
std.os.close(self.epoll_fd); | |
self.* = undefined; | |
} | |
pub fn notify(self: *Self) void { | |
var epoll_event = std.os.epoll_event{ | |
.events = std.os.EPOLLOUT | std.os.EPOLLONESHOT, | |
.data = .{ .ptr = 0 }, | |
}; | |
var epoll_ctl_op: u32 = std.os.EPOLL_CTL_ADD; | |
if (self.registered_notify_fd) { | |
epoll_ctl_op = std.os.EPOLL_CTL_MOD; | |
} else { | |
self.registered_notify_fd = true; | |
} | |
std.os.epoll_ctl( | |
self.epoll_fd, | |
epoll_ctl_op, | |
self.notify_fd, | |
&epoll_event, | |
) catch unreachable; | |
} | |
pub fn poll(self: *Self, events: []Event, timeout: ?u64) usize { | |
var timeout_ms: i32 = -1; | |
if (timeout) |timeout_ns| { | |
timeout_ms = std.math.cast(i32, timeout_ns / std.time.ns_per_ms) catch -1; | |
} | |
return std.os.epoll_wait( | |
self.epoll_fd, | |
@ptrCast([*]std.os.epoll_event, events.ptr)[0..events.len], | |
timeout_ms, | |
); | |
} | |
pub const Event = extern struct { | |
epoll_event: std.os.epoll_event, | |
pub fn getData(self: Event) usize { | |
return self.epoll_event.data.ptr; | |
} | |
pub fn isNotified(self: Event) bool { | |
return self.getData() == 0; | |
} | |
pub fn isReadable(self: Event) bool { | |
const flags = std.os.EPOLLERR | std.os.EPOLLHUP | std.os.EPOLLIN | std.os.EPOLLRDHUP; | |
return self.epoll_event.events & flags != 0; | |
} | |
pub fn isWritable(self: Event) bool { | |
const flags = std.os.EPOLLERR | std.os.EPOLLHUP | std.os.EPOLLIN | std.os.EPOLLRDHUP; | |
return self.epoll_event.events & flags != 0; | |
} | |
}; | |
}); | |
fn PosixBackend(comptime IoPoller: type) type { | |
return struct { | |
pub const Poller = struct { | |
io_poller: IoPoller, | |
const Self = @This(); | |
pub fn init(self: *Self) !void { | |
try self.io_poller.init(); | |
} | |
pub fn deinit(self: *Self) void { | |
self.io_poller.deinit(); | |
} | |
pub fn notify(self: *Self) void { | |
self.io_poller.notify(); | |
} | |
pub fn poll(self: *Self, events: *Events, timeout: ?u64) void { | |
events.* = Events{}; | |
events.found = self.io_poller.poll(&events.events, timeout); | |
return; | |
} | |
pub const Events = struct { | |
events: [64]IoPoller.Event = undefined, | |
readers: ?*Loop.Runnable = null, | |
writers: ?*Loop.Runnable = null, | |
index: usize = 0, | |
found: usize = 0, | |
fn pop(stack: *?*Loop.Runnable) ?*Loop.Runnable { | |
const runnable = stack.* orelse return null; | |
stack.* = runnable.next; | |
return runnable; | |
} | |
pub fn next(self: *Events) error{Notified}!?*Loop.Runnable { | |
if (pop(&self.writers) orelse pop(&self.readers)) |runnable| | |
return runnable; | |
while (self.index < self.found) { | |
const event = self.events[self.index]; | |
self.index += 1; | |
if (event.isNotified()) | |
return error.Notified; | |
const async_fd = @intToPtr(*AsyncFd, event.getData()); | |
if (event.isWritable()) | |
self.writers = async_fd.wake(.Write); | |
if (event.isReadable()) | |
self.readers = async_fd.wake(.Read); | |
if (pop(&self.writers) orelse pop(&self.readers)) |runnable| | |
return runnable; | |
} | |
return null; | |
} | |
}; | |
}; | |
const AsyncFd = struct { | |
fd: std.os.fd_t, | |
next: ?*Self = null, | |
reader: ?*Loop.Runnable = null, | |
writer: ?*Loop.Runnable = null, | |
const Self = @This(); | |
const Event = enum { Read, Write }; | |
const NOTIFIED = @intToPtr(*Loop.Runnable, @alignOf(Loop.Runnable)); | |
var cache_lock = Lock{}; | |
var cache_top: ?*Self = null; | |
var cache_gpa = std.heap.GeneralPurposeAllocator(.{}){}; | |
fn alloc() !*Self { | |
const held = cache_lock.acquire(); | |
defer held.release(); | |
if (cache_top) |self| { | |
cache_top = self.next; | |
return self; | |
} else { | |
return cache_gpa.allocator.create(Self); | |
} | |
} | |
fn free(self: *Self) void { | |
const held = cache_lock.acquire(); | |
defer held.release(); | |
self.next = cache_top; | |
cache_top = self; | |
} | |
fn wait(self: *Self, event: Event) void { | |
const queue_ptr = switch (event) { | |
.Read => &self.reader, | |
.Write => &self.writer, | |
}; | |
var runnable = Loop.Runnable.init(@frame()); | |
suspend { | |
// NOTIFIED -> null: consume notification & tail resume | |
// x -> our runnable: enqueue ourselves to stack and suspend | |
var queue = @atomicLoad(?*Loop.Runnable, queue_ptr, .SeqCst); | |
while (true) { | |
runnable.next = queue; | |
queue = @cmpxchgWeak( | |
?*Loop.Runnable, | |
queue_ptr, | |
queue, | |
if (queue == NOTIFIED) null else &runnable, | |
.SeqCst, | |
.SeqCst, | |
) orelse { | |
if (queue == NOTIFIED) | |
Loop.getInstance().?.schedule(&runnable); | |
break; | |
}; | |
} | |
} | |
} | |
fn wake(self: *Self, event: Event) ?*Loop.Runnable { | |
const queue_ptr = switch (event) { | |
.Read => &self.reader, | |
.Write => &self.writer, | |
}; | |
var queue = @atomicLoad(?*Loop.Runnable, queue_ptr, .SeqCst); | |
while (true) { | |
if (queue == NOTIFIED) | |
return null; | |
queue = @cmpxchgWeak( | |
?*Loop.Runnable, | |
queue_ptr, | |
queue, | |
if (queue == null) NOTIFIED else null, | |
.SeqCst, | |
.SeqCst, | |
) orelse { | |
return queue; | |
}; | |
} | |
} | |
}; | |
}; | |
} | |
}; | |
const Platform = struct { | |
pub const Thread = std.Thread; | |
pub usingnamespace if (std.builtin.os.tag == .windows) | |
WindowsPlatform | |
else if (std.Target.current.isDarwin()) | |
DarwinPlatform | |
else if (std.builtin.link_libc) | |
PosixPlatform | |
else if (std.builtin.os.tag == .linux) | |
LinuxPlatform | |
else | |
@compileError("Platform not supported"); | |
const WindowsPlatform = @compileError("TODO"); | |
const DarwinPlatform = @compileError("TODO"); | |
const PosixPlatform = struct { | |
pub const Lock = struct { | |
mutex: std.c.pthread_mutex_t = std.c.PTHREAD_MUTEX_INITIALIZER, | |
const Self = @This(); | |
pub fn acquire(self: *Self) Held { | |
std.debug.assert(std.c.pthread_mutex_lock(&self.mutex) == 0); | |
return Held{ .lock = self }; | |
} | |
pub const Held = struct { | |
lock: *Self, | |
pub fn release(self: Held) void { | |
std.debug.assert(std.c.pthread_mutex_unlock(&self.lock.mutex) == 0); | |
} | |
}; | |
}; | |
}; | |
const LinuxPlatform = struct { | |
pub const Lock = struct { | |
state: State = .Unlocked, | |
const Self = @This(); | |
const State = enum(i32) { | |
Unlocked = 0, | |
Locked, | |
Contended, | |
}; | |
pub fn deinit(self: *Self) void { | |
self.* = undefined; | |
} | |
pub fn acquire(self: *Self) Held { | |
if (@cmpxchgWeak( | |
State, | |
&self.state, | |
.Unlocked, | |
.Locked, | |
.Acquire, | |
.Monotonic, | |
)) |_| { | |
self.acquireSlow(); | |
} | |
return Held{ .lock = self }; | |
} | |
fn acquireSlow(self: *Self) void { | |
@setCold(true); | |
var spin: usize = 0; | |
var lock_state = State.Locked; | |
var state = @atomicLoad(State, &self.state, .Monotonic); | |
while (true) { | |
if (state == .Unlocked) { | |
state = @cmpxchgWeak( | |
State, | |
&self.state, | |
state, | |
lock_state, | |
.Acquire, | |
.Monotonic, | |
) orelse return; | |
std.Thread.spinLoopHint(); | |
continue; | |
} | |
const max_spin = switch (std.builtin.arch) { | |
.i386, .x86_64 => 10, | |
.aarch64 => 5, | |
else => 0, | |
}; | |
if (state == .Locked and spin < max_spin) { | |
spin += 1; | |
std.Thread.spinLoopHint(); | |
state = @atomicLoad(State, &self.state, .Monotonic); | |
continue; | |
} | |
if (state != .Contended) { | |
if (@cmpxchgWeak( | |
State, | |
&self.state, | |
state, | |
.Contended, | |
.Monotonic, | |
.Monotonic, | |
)) |updated| { | |
std.Thread.spinLoopHint(); | |
state = updated; | |
continue; | |
} | |
} | |
switch (std.os.linux.getErrno(std.os.linux.futex_wait( | |
@ptrCast(*const i32, &self.state), | |
std.os.linux.FUTEX_PRIVATE_FLAG | std.os.linux.FUTEX_WAIT, | |
@enumToInt(State.Contended), | |
null, | |
))) { | |
0 => {}, | |
std.os.EINTR => {}, | |
std.os.EAGAIN => {}, | |
std.os.ETIMEDOUT => unreachable, | |
else => unreachable, | |
} | |
spin = 0; | |
lock_state = .Contended; | |
state = @atomicLoad(State, &self.state, .Monotonic); | |
} | |
} | |
pub const Held = struct { | |
lock: *Self, | |
pub fn release(self: Held) void { | |
switch (@atomicRmw(State, &self.lock.state, .Xchg, .Unlocked, .Release)) { | |
.Unlocked => unreachable, | |
.Locked => {}, | |
.Contended => self.releaseSlow(), | |
} | |
} | |
fn releaseSlow(self: Held) void { | |
@setCold(true); | |
switch (std.os.linux.getErrno(std.os.linux.futex_wake( | |
@ptrCast(*const i32, &self.lock.state), | |
std.os.linux.FUTEX_PRIVATE_FLAG | std.os.linux.FUTEX_WAKE, | |
1, | |
))) { | |
0 => {}, | |
std.os.EINVAL => {}, | |
std.os.EACCES => {}, | |
std.os.EFAULT => {}, | |
else => unreachable, | |
} | |
} | |
}; | |
}; | |
}; | |
}; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
pub const Loop = struct { | |
mutex: std.Thread.Mutex, | |
event_poller: EventPoller, | |
state: State, | |
spawned: usize, | |
threads: []*std.Thread, | |
allocator: *std.mem.Allocator, | |
run_queue: std.SinglyLinkedList(anyframe), | |
const State = enum(u8) { | |
Running, | |
Waking, | |
Shutdown, | |
}; | |
pub fn init(self: *Loop, allocator: *std.mem.Allocator) !void { | |
self.* = .{ | |
.mutex = .{}, | |
.event_poller = try EventPoller.init(), | |
.state = .Running, | |
.spawned = 0, | |
.threads = &[_]*std.Thread{}, | |
.allocator = allocator, | |
.run_queue = .{}, | |
}; | |
errdefer self.deinit(); | |
var num_threads: usize = 0; | |
if (!std.builtin.single_threaded) | |
num_threads = std.math.max(1, std.Thread.cpuCount() catch 1); | |
self.threads = try allocator.alloc(*std.Thread, num_threads); | |
for (self.threads) |*thread| { | |
thread.* = try std.Thread.spawn(Loop.run, self); | |
self.spawned += 1; | |
} | |
} | |
pub fn deinit(self: *Loop) void { | |
self.shutdown(); | |
for (self.threads) |thread| { | |
thread.wait(); | |
} | |
self.allocator.free(self.threads); | |
self.run_queue.deinit(); | |
self.event_poller.deinit(); | |
self.* = undefined; | |
} | |
pub fn shutdown(self: *Loop) void { | |
const state = @atomicRmw(State, &self.state, .Xchg, .Shutdown, .SeqCst); | |
if (state != .Waking) { | |
self.event_poller.notify(); | |
} | |
} | |
pub fn run(self: *Loop) void { | |
var is_running = true; | |
while (true) { | |
if (self.pop()) |task| { | |
if (!is_running) self.notify(); | |
is_running = true; | |
resume task.data; | |
continue; | |
} | |
const state = @atomicLoad(State, &self.state, .SeqCst); | |
if (state != .Shutdown) { | |
is_running = false; | |
self.wait(); | |
continue; | |
} | |
self.notify(); | |
return; | |
} | |
} | |
pub const Task = std.SinglyLinkedList(anyframe).Node; | |
pub fn push(self: *Loop, task: *Task) void { | |
{ | |
const held = self.mutex.acquire(); | |
defer held.release(); | |
self.run_queue.prepend(task); | |
} | |
self.notify(); | |
} | |
fn pop(self: *Loop) ?*Task { | |
const held = self.mutex.acquire(); | |
defer held.release(); | |
return self.run_queue.popFirst(); | |
} | |
fn notify(self: *Loop) void { | |
if (@cmpxchgStrong(State, &self.state, .Running, .Waking, .SeqCst, .SeqCst) == null) { | |
self.event_poller.notify(); | |
} | |
} | |
fn wait(self: *Loop) void { | |
var events = self.event_poller.poll(); | |
while (events.next()) |task_ptr| { | |
if (@intToPtr(?*Task, task_ptr)) |task| { | |
self.push(task); | |
continue; | |
} | |
_ = @cmpxchgStrong( | |
State, | |
&self.state, | |
.Waking, | |
.Running, | |
.SeqCst, | |
.SeqCst, | |
); | |
} | |
} | |
const EventPoller = switch (std.builtin.os.tag) { | |
.linux => LinuxPoller, | |
.windows => WindowsPoller, | |
.macos, .openbsd, .freebsd, .netbsd, .dragonfly => BsdPoller, | |
else => @compileError("TODO: use select()"), | |
}; | |
const WindowsPoller = @compileError("TODO: IOCP + AFD"); | |
const LinuxPoller = struct { | |
epoll_fd: std.os.fd_t, | |
notify_fd: std.os.fd_t, | |
did_register_notify_fd: bool, | |
pub fn init() !LinuxPoller { | |
const epoll_fd = try std.os.epoll_create1(std.os.EPOLL_CLOEXEC); | |
errdefer std.os.close(epoll_fd); | |
const notify_fd = try std.os.eventfd(0, std.os.EFD_CLOEXEC | std.os.EFD_NONBLOCK); | |
errdefer std.os.close(notify_fd); | |
return .{ | |
.epoll_fd = epoll_fd, | |
.notify_fd = notify_fd, | |
.did_register_notify_fd = false, | |
}; | |
} | |
pub fn deinit(self: *LinuxPoller) void { | |
std.os.close(self.notify_fd); | |
std.os.close(self.epoll_fd); | |
self.* = undefined; | |
} | |
pub fn notify(self: *LinuxPoller) void { | |
self.arm(self.notify_fd, 0, std.os.EPOLLOUT | std.os.EPOLLONESHOT) catch unreachable; | |
} | |
pub fn wait(self: *LinuxPoller, fd: std.os.fd_t, event: enum{ read, write }) !void { | |
var task = Task{ .data = @frame() }; | |
var err: ?std.os.EpollCtlError = null; | |
suspend { | |
self.arm(fd, @ptrToInt(&task), switch (event) { | |
.read => std.os.EPOLLIN | std.os.EPOLLONESHOT | std.os.EPOLLRDHUP, | |
.write => std.os.EPOLLOUT | std.os.EPOLLONESHOT, | |
}) catch |e| { | |
err = e; | |
Loop.instance.?.push(&task); | |
}; | |
} | |
if (err) |wait_err| { | |
return wait_err; | |
} | |
} | |
fn arm(self: *LinuxPoller, fd: std.os.fd_t, data: usize, flags: u32) !void { | |
var op: u32 = std.os.EPOLL_CTL_MOD; | |
var event = std.os.epoll_event{ | |
.events = flags, | |
.data = .{ .ptr = data }, | |
}; | |
std.os.epoll_ctl(self.epoll_fd, std.os.EPOLL_CTL_MOD, fd, &event) catch |err| switch (err) { | |
error.FileDescriptorNotRegistered => { | |
try std.os.epoll_ctl(self.epoll_fd, std.os.EPOLL_CTL_ADD, fd, &event); | |
}, | |
else => return err, | |
}; | |
} | |
pub fn poll(self: *LinuxPoller) Poll { | |
var poll: Poll = undefined; | |
poll.active = std.os.epoll_wait(self.epoll_fd, &poll.events, -1); | |
poll.index = 0; | |
return poll; | |
} | |
pub const Poll = struct { | |
events: [64]std.os.epoll_event, | |
active: usize, | |
index: usize, | |
pub fn next(self: *Poll) ?usize { | |
if (self.index >= self.active) { | |
return null; | |
} | |
const event = self.events[self.index]; | |
self.index += 1; | |
return event.data.ptr; | |
} | |
}; | |
}; | |
const BsdPoller = struct { | |
kqueue_fd: std.os.fd_t, | |
pub fn init() !BsdPoller { | |
return BsdPoller{ .kqueue_fd = try std.os.kqueue() }; | |
} | |
pub fn deinit(self: *BsdPoller) void { | |
std.os.close(self.kqueue_fd); | |
self.* = undefined; | |
} | |
pub fn notify(self: *BsdPoller) void { | |
self.arm(self.kqueue_fd, 0, std.os.EVFILT_USER) catch unreachable; | |
} | |
pub fn wait(self: *BsdPoller, fd: std.os.fd_t, event: enum{ read, write }) !void { | |
var task = Task{ .data = @frame() }; | |
var err: ?std.os.KEventError = null; | |
suspend { | |
self.arm(fd, @ptrToInt(&task), switch (event) { | |
.read => std.os.EVFILT_READ, | |
.write => std.os.EVFILT_WRITE, | |
}) catch |e| { | |
err = e; | |
Loop.instance.?.push(&task); | |
}; | |
} | |
if (err) |wait_err| { | |
return wait_err; | |
} | |
} | |
fn arm(self: *BsdPoller, fd: std.os.fd_t, data: usize, filter: u32) !void { | |
var events: [1]std.os.Kevent = undefined; | |
events[0] = .{ | |
.ident = fd, | |
.filter = filter, | |
.flags = std.os.EV_ADD | std.os.EV_ENABLE | std.os.EV_ONESHOT, | |
.fflags = 0, | |
.data = 0, | |
.udata = data, | |
}; | |
_ = try std.os.kevent( | |
self.kqueue_fd, | |
&events, | |
&[0]os.Kevent{}, | |
null, | |
); | |
} | |
pub fn poll(self: *BsdPoller) Poll { | |
var poll: Poll = undefined; | |
poll.index = 0; | |
poll.active = std.os.kevent( | |
self.kqueue_fd, | |
&[0]os.Kevent{}, | |
&poll.events, | |
null, | |
) catch 0; | |
return poll; | |
} | |
pub const Poll = struct { | |
events: [64]std.os.Kevent, | |
active: usize, | |
index: usize, | |
pub fn next(self: *Poll) ?usize { | |
if (self.index >= self.active) { | |
return null; | |
} | |
const event = self.events[self.index]; | |
self.index += 1; | |
return event.udata; | |
} | |
}; | |
}; | |
}; | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
pub const Loop = struct { | |
mutex: std.Thread.Mutex, | |
is_running: bool, | |
is_notified: bool, | |
event_poller: EventPoller, | |
spawned: usize, | |
threads: []*std.Thread, | |
allocator: *std.mem.Allocator, | |
run_queue: std.SinglyLinkedList(anyframe), | |
var global_instance: Loop = undefined; | |
pub const instance: ?*Loop = &global_instance; | |
pub fn init(self: *Loop, allocator: *std.mem.Allocator) !void { | |
self.* = .{ | |
.mutex = .{}, | |
.is_running = true, | |
.is_notified = false, | |
.event_poller = try EventPoller.init(), | |
.spawned = 0, | |
.threads = &[_]*std.Thread{}, | |
.allocator = allocator, | |
.run_queue = .{}, | |
}; | |
errdefer self.deinit(); | |
var num_threads: usize = 0; | |
if (!std.builtin.single_threaded) | |
num_threads = std.math.max(1, std.Thread.cpuCount() catch 1); | |
self.threads = try allocator.alloc(*std.Thread, num_threads); | |
for (self.threads) |*thread| { | |
thread.* = try std.Thread.spawn(Loop.run, self); | |
self.spawned += 1; | |
} | |
} | |
pub fn deinit(self: *Loop) void { | |
self.shutdown(); | |
for (self.threads) |thread| { | |
thread.wait(); | |
} | |
self.allocator.free(self.threads); | |
self.event_poller.deinit(); | |
self.* = undefined; | |
} | |
pub fn shutdown(self: *Loop) void { | |
@atomicStore(bool, &self.is_running, false, .SeqCst); | |
self.notify(); | |
} | |
pub fn run(self: *Loop) void { | |
var was_waiting = true; | |
while (true) { | |
if (self.pop()) |task| { | |
if (was_waiting) self.notify(); | |
was_waiting = false; | |
resume task.data; | |
continue; | |
} | |
if (@atomicLoad(bool, &self.is_running, .SeqCst)) { | |
self.wait(); | |
was_waiting = true; | |
continue; | |
} | |
self.notify(); | |
return; | |
} | |
} | |
pub const Task = std.SinglyLinkedList(anyframe).Node; | |
pub fn push(self: *Loop, task: *Task) void { | |
{ | |
const held = self.mutex.acquire(); | |
defer held.release(); | |
self.run_queue.prepend(task); | |
} | |
self.notify(); | |
} | |
fn pop(self: *Loop) ?*Task { | |
const held = self.mutex.acquire(); | |
defer held.release(); | |
return self.run_queue.popFirst(); | |
} | |
fn notify(self: *Loop) void { | |
if (@cmpxchgStrong(bool, &self.is_notified, false, true, .SeqCst, .SeqCst) == null) { | |
self.event_poller.notify(); | |
} | |
} | |
fn wait(self: *Loop) void { | |
var events = self.event_poller.poll(); | |
while (events.next()) |task_ptr| { | |
if (@intToPtr(?*Task, task_ptr)) |task| { | |
self.push(task); | |
} else { | |
@atomicStore(bool, &self.is_notified, false, .SeqCst); | |
} | |
} | |
} | |
const EventPoller = switch (std.builtin.os.tag) { | |
.linux => LinuxPoller, | |
.windows => WindowsPoller, | |
.macos, .openbsd, .freebsd, .netbsd, .dragonfly => BsdPoller, | |
else => @compileError("TODO: use select()"), | |
}; | |
const WindowsPoller = @compileError("TODO: IOCP + AFD"); | |
const LinuxPoller = struct { | |
epoll_fd: std.os.fd_t, | |
notify_fd: std.os.fd_t, | |
did_register_notify_fd: bool, | |
pub fn init() !LinuxPoller { | |
const epoll_fd = try std.os.epoll_create1(std.os.EPOLL_CLOEXEC); | |
errdefer std.os.close(epoll_fd); | |
const notify_fd = try std.os.eventfd(0, std.os.EFD_CLOEXEC | std.os.EFD_NONBLOCK); | |
errdefer std.os.close(notify_fd); | |
return LinuxPoller{ | |
.epoll_fd = epoll_fd, | |
.notify_fd = notify_fd, | |
.did_register_notify_fd = false, | |
}; | |
} | |
pub fn deinit(self: *LinuxPoller) void { | |
std.os.close(self.notify_fd); | |
std.os.close(self.epoll_fd); | |
self.* = undefined; | |
} | |
pub fn notify(self: *LinuxPoller) void { | |
self.arm(self.notify_fd, 0, std.os.EPOLLOUT | std.os.EPOLLONESHOT) catch unreachable; | |
} | |
pub fn wait(self: *LinuxPoller, fd: std.os.fd_t, event: enum{ read, write }) !void { | |
var task = Task{ .data = @frame() }; | |
var err: ?std.os.EpollCtlError = null; | |
suspend { | |
self.arm(fd, @ptrToInt(&task), switch (event) { | |
.read => std.os.EPOLLIN | std.os.EPOLLONESHOT | std.os.EPOLLRDHUP, | |
.write => std.os.EPOLLOUT | std.os.EPOLLONESHOT, | |
}) catch |e| { | |
err = e; | |
Loop.instance.?.push(&task); | |
}; | |
} | |
if (err) |wait_err| { | |
return wait_err; | |
} | |
} | |
fn arm(self: *LinuxPoller, fd: std.os.fd_t, data: usize, flags: u32) !void { | |
var op: u32 = std.os.EPOLL_CTL_MOD; | |
var event = std.os.epoll_event{ | |
.events = flags, | |
.data = .{ .ptr = data }, | |
}; | |
std.os.epoll_ctl(self.epoll_fd, std.os.EPOLL_CTL_MOD, fd, &event) catch |err| switch (err) { | |
error.FileDescriptorNotRegistered => { | |
try std.os.epoll_ctl(self.epoll_fd, std.os.EPOLL_CTL_ADD, fd, &event); | |
}, | |
else => return err, | |
}; | |
} | |
pub fn poll(self: *LinuxPoller) Polled { | |
var polled: Polled = undefined; | |
polled.active = std.os.epoll_wait(self.epoll_fd, &polled.events, -1); | |
polled.index = 0; | |
return polled; | |
} | |
pub const Polled = struct { | |
events: [64]std.os.epoll_event, | |
active: usize, | |
index: usize, | |
pub fn next(self: *Polled) ?usize { | |
if (self.index >= self.active) { | |
return null; | |
} | |
const event = self.events[self.index]; | |
self.index += 1; | |
return event.data.ptr; | |
} | |
}; | |
}; | |
const BsdPoller = struct { | |
kqueue_fd: std.os.fd_t, | |
pub fn init() !BsdPoller { | |
return BsdPoller{ .kqueue_fd = try std.os.kqueue() }; | |
} | |
pub fn deinit(self: *BsdPoller) void { | |
std.os.close(self.kqueue_fd); | |
self.* = undefined; | |
} | |
pub fn notify(self: *BsdPoller) void { | |
self.arm(self.kqueue_fd, 0, std.os.EVFILT_USER) catch unreachable; | |
} | |
pub fn wait(self: *BsdPoller, fd: std.os.fd_t, event: enum{ read, write }) !void { | |
var task = Task{ .data = @frame() }; | |
var err: ?std.os.KEventError = null; | |
suspend { | |
self.arm(fd, @ptrToInt(&task), switch (event) { | |
.read => std.os.EVFILT_READ, | |
.write => std.os.EVFILT_WRITE, | |
}) catch |e| { | |
err = e; | |
Loop.instance.?.push(&task); | |
}; | |
} | |
if (err) |wait_err| { | |
return wait_err; | |
} | |
} | |
fn arm(self: *BsdPoller, fd: std.os.fd_t, data: usize, filter: u32) !void { | |
var events: [1]std.os.Kevent = undefined; | |
events[0] = .{ | |
.ident = fd, | |
.filter = filter, | |
.flags = std.os.EV_ADD | std.os.EV_ENABLE | std.os.EV_ONESHOT, | |
.fflags = 0, | |
.data = 0, | |
.udata = data, | |
}; | |
_ = try std.os.kevent( | |
self.kqueue_fd, | |
&events, | |
&[0]os.Kevent{}, | |
null, | |
); | |
} | |
pub fn poll(self: *BsdPoller) Polled { | |
var polled: Polled = undefined; | |
polled.index = 0; | |
polled.active = std.os.kevent( | |
self.kqueue_fd, | |
&[0]os.Kevent{}, | |
&polled.events, | |
null, | |
) catch 0; | |
return polled; | |
} | |
pub const Polled = struct { | |
events: [64]std.os.Kevent, | |
active: usize, | |
index: usize, | |
pub fn next(self: *Polled) ?usize { | |
if (self.index >= self.active) { | |
return null; | |
} | |
const event = self.events[self.index]; | |
self.index += 1; | |
return event.udata; | |
} | |
}; | |
}; | |
}; | |
const sync = struct { | |
pub const RwLock = struct { | |
state: usize = 0, | |
const READING = 1; | |
const WRITING = std.math.maxInt(usize); | |
pub fn tryAcquire(self: *RwLock) ?Held { | |
if (@cmpxchgStrong(usize, &self.state, 0, WRITING, .SeqCst, .SeqCst) == null) { | |
return Held{ .lock = self }; | |
} else { | |
return null; | |
} | |
} | |
pub fn acquire(self: *RwLock) Held { | |
while (true) { | |
if (self.tryAcquire()) |held| { | |
return held; | |
} | |
var waiter = Futex.wait(@ptrToInt(self)); | |
switch (@atomicLoad(usize, &self.state, .SeqCst)) { | |
0 => waiter.cancel(), | |
else => waiter.wait(), | |
} | |
} | |
} | |
pub fn tryAcquireShared(self: *RwLock) ?Held { | |
var state = @atomicLoad(usize, &self.state, .SeqCst); | |
while (true) { | |
if (state == WRITING) { | |
return null; | |
} | |
state = @cmpxchgWeak( | |
usize, | |
&self.state, | |
state, | |
state + READING, | |
.SeqCst, | |
.SeqCst, | |
) orelse return Held{ | |
.is_shared = true, | |
.lock = self, | |
}; | |
} | |
} | |
pub fn acquireShared(self: *RwLock) Held { | |
while (true) { | |
if (self.tryAcquireShared()) |held| { | |
return held; | |
} | |
var waiter = Futex.wait(@ptrToInt(self)); | |
switch (@atomicLoad(usize, &self.state, .SeqCst)) { | |
WRITING => waiter.wait(), | |
else => waiter.cancel(), | |
} | |
} | |
} | |
pub const Held = struct { | |
is_shared: bool = false, | |
lock: *RwLock, | |
pub fn release(self: Held) void { | |
if (self.is_shared) { | |
self.lock.releaseShared(); | |
} else { | |
self.lock.release(); | |
} | |
} | |
}; | |
pub fn release(self: *RwLock) void { | |
@atomicStore(usize, &self.state, 0, .SeqCst); | |
Futex.wake(@ptrToInt(self), 2); | |
} | |
pub fn releaseShared(self: *RwLock) void { | |
const state = @atomicRmw(usize, &self.state, .Sub, READING, .SeqCst); | |
if (state == READING) { | |
Futex.wake(@ptrToInt(self), 1); | |
} | |
} | |
}; | |
pub const Condvar = struct { | |
was_notified: bool = false, | |
pub fn wait(self: *Condvar, held: Mutex.Held) void { | |
var waiter = Futex.wait(@ptrToInt(self)); | |
if (@atomicLoad(bool, &self.was_notified, .SeqCst)) { | |
@atomicStore(bool, &self.was_notified, false, .SeqCst); | |
waiter.cancel(); | |
return; | |
} | |
held.release(); | |
waiter.wait(); | |
_ = held.mutex.acquire(); | |
} | |
pub fn signal(self: *Condvar) void { | |
@atomicStore(bool, &self.was_notified, true, .SeqCst); | |
Futex.wake(@ptrToInt(self), 1); | |
} | |
pub fn broadcast(self: *Condvar) void { | |
@atomicStore(bool, &self.was_notified, true, .SeqCst); | |
Futex.wake(@ptrToInt(self), std.math.maxInt(usize)); | |
} | |
}; | |
pub const Mutex = struct { | |
is_held: bool = false, | |
pub fn tryAcquire(self: *Mutex) ?Held { | |
if (@cmpxchgStrong(bool, &self.is_held, false, true, .SeqCst, .SeqCst) == null) { | |
return Held{ .mutex = self }; | |
} else { | |
return null; | |
} | |
} | |
pub fn acquire(self: *Mutex) Held { | |
while (true) { | |
if (self.tryAcquire()) |held| { | |
return held; | |
} | |
var waiter = Futex.wait(@ptrToInt(self)); | |
if (@atomicLoad(bool, &self.is_held, .SeqCst)) { | |
waiter.wait(); | |
} else { | |
waiter.cancel(); | |
} | |
} | |
} | |
pub const Held = struct { | |
mutex: *Mutex, | |
pub fn release(self: Held) void { | |
@atomicStore(bool, &self.mutex.is_held, false, .SeqCst); | |
Futex.wake(@ptrToInt(self.mutex), 1); | |
} | |
}; | |
}; | |
pub const WaitGroup = struct { | |
active: usize = 0, | |
pub fn init(active: usize) WaitGroup { | |
return WaitGroup{ .active = active }; | |
} | |
pub fn done(self: *WaitGroup) void { | |
self.add(-1); | |
} | |
pub fn add(self: *WaitGroup, delta: isize) void { | |
var active: usize = undefined; | |
defer if (active == 0) { | |
Futex.wake(@ptrToInt(self), std.math.maxInt(usize)); | |
}; | |
if (delta < 0) { | |
active = @atomicRmw(usize, &self.active, .Sub, @intCast(usize, -delta), .SeqCst); | |
} else { | |
active = @atomicRmw(usize, &self.active, .Add, @intCast(usize, delta), .SeqCst); | |
} | |
if (delta < 0) { | |
active -= @intCast(usize, -delta); | |
} else { | |
active += @intCast(usize, delta); | |
} | |
} | |
pub fn wait(self: *WaitGroup) void { | |
while (true) { | |
if (@atomicLoad(usize, &self.active, .SeqCst) == 0) { | |
return; | |
} | |
var waiter = Futex.wait(@ptrToInt(self)); | |
switch (@atomicLoad(usize, &self.active, .SeqCst)) { | |
0 => waiter.cancel(), | |
else => waiter.wait(), | |
} | |
} | |
} | |
}; | |
pub const Semaphore = struct { | |
permits: usize = 0, | |
pub fn init(permits: usize) Semaphore { | |
return Semaphore{ .permits = permits }; | |
} | |
pub fn post(self: *Semaphore) void { | |
_ = @atomicRmw(usize, &self.permits, .Add, 1, .SeqCst); | |
Futex.wake(@ptrToInt(self), 1); | |
} | |
pub fn tryWait(self: *Semaphore) bool { | |
var permits = @atomicLoad(usize, &self.permits, .SeqCst); | |
while (true) { | |
if (permits == 0) { | |
return false; | |
} | |
permits = @cmpxchgWeak( | |
usize, | |
&self.permits, | |
permits, | |
permits - 1, | |
.SeqCst, | |
.SeqCst, | |
) orelse return true; | |
} | |
} | |
pub fn wait(self: *Semaphore) void { | |
while (true) { | |
if (self.tryWait()) { | |
return; | |
} | |
var waiter = Futex.wait(@ptrToInt(self)); | |
switch (@atomicLoad(usize, &self.permits, .SeqCst)) { | |
0 => waiter.wait(), | |
else => waiter.cancel(), | |
} | |
} | |
} | |
}; | |
pub fn Channel(comptime T: type) type { | |
return struct { | |
mutex: Mutex = .{}, | |
readers: Condvar = .{}, | |
writers: Condvar = .{}, | |
head: usize = 0, | |
size: usize = 0, | |
capacity: usize, | |
buffer: if (@sizeOf(T) > 0) [*]T else void, | |
const Self = @This(); | |
pub fn init(items: []T) Self { | |
return Self{ | |
.capacity = items.len, | |
.buffer = if (@sizeOf(T) > 0) items.ptr else {}, | |
}; | |
} | |
pub fn tryWriteItem(self: *Self, item: T) bool { | |
return self.tryWrite(&[_]T{ item }) == 1; | |
} | |
pub fn tryReadItem(self: *Self) ?T { | |
var items: [1]T = undefined; | |
return if (self.tryRead(&items) == 1) items[0] else null; | |
} | |
pub fn writeItem(self: *Self, item: T) void { | |
return self.write(&[_]T{ item }); | |
} | |
pub fn readItem(self: *Self) T { | |
var items: [1]T = undefined; | |
self.read(&items); | |
return items[0]; | |
} | |
pub fn tryWrite(self: *Self, items: []const T) usize { | |
return self.writeItems(items, false); | |
} | |
pub fn tryRead(self: *Self, items: []T) usize { | |
return self.readItems(items, false); | |
} | |
pub fn write(self: *Self, items: []const T) void { | |
std.debug.assert(self.writeItems(items, true) == items.len); | |
} | |
pub fn read(self: *Self, items: []T) void { | |
std.debug.assert(self.readItems(items, true) == items.len); | |
} | |
fn writeItems(self: *Self, items: []const T, comptime can_block: bool) usize { | |
const held = self.mutex.acquire(); | |
defer held.release(); | |
var produced: usize = 0; | |
while (produced < items.len) { | |
if (self.size < self.capacity) { | |
if (@sizeOf(T) > 0) { | |
self.buffer[(self.head +% self.size) % self.capacity] = items[produced]; | |
} | |
self.readers.signal(); | |
self.size += 1; | |
produced += 1; | |
continue; | |
} | |
switch (can_block) { | |
true => self.writers.wait(held), | |
else => break, | |
} | |
} | |
return produced; | |
} | |
fn readItems(self: *Self, items: []T, comptime can_block: bool) usize { | |
const held = self.mutex.acquire(); | |
defer held.release(); | |
var consumed: usize = 0; | |
while (consumed < items.len) { | |
if (self.size > 0) { | |
if (@sizeOf(T) > 0) { | |
items.ptr[consumed] = self.buffer[self.head % self.capacity]; | |
} | |
self.writers.signal(); | |
self.head +%= 1; | |
self.size -= 1; | |
consumed += 1; | |
continue; | |
} | |
switch (can_block) { | |
true => self.readers.wait(held), | |
else => break, | |
} | |
} | |
return consumed; | |
} | |
}; | |
} | |
pub fn yield() void { | |
suspend { | |
var task = Loop.Task{ .data = @frame() }; | |
Futex.getLoop().push(&task); | |
} | |
} | |
const Futex = struct { | |
fn getLoop() *Loop { | |
return Loop.instance orelse { | |
@compileError("Must have an event loop enabled"); | |
}; | |
} | |
const Queue = std.TailQueue(struct { | |
address: usize, | |
task: Loop.Task, | |
}); | |
const Bucket = struct { | |
mutex: std.Thread.Mutex = .{}, | |
queue: Queue = .{}, | |
waiters: usize = 0, | |
var array = [_]Bucket{.{}} ** 256; | |
fn from(address: usize) *Bucket { | |
return &array[address % array.len]; | |
} | |
}; | |
pub const Waiter = struct { | |
held: @TypeOf(@as(std.Thread.Mutex, undefined).impl).Held, | |
bucket: *Bucket, | |
node: Queue.Node, | |
pub fn cancel(self: *Waiter) void { | |
_ = @atomicRmw(usize, &self.bucket.waiters, .Sub, 1, .SeqCst); | |
self.held.release(); | |
} | |
pub fn wait(self: *Waiter) void { | |
suspend { | |
self.node.data.task = Loop.Task{ .data = @frame() }; | |
self.bucket.queue.append(&self.node); | |
self.held.release(); | |
} | |
} | |
}; | |
pub fn wait(address: usize) Waiter { | |
var waiter: Waiter = undefined; | |
waiter.node.data.address = address; | |
waiter.bucket = Bucket.from(address); | |
waiter.held = waiter.bucket.mutex.acquire(); | |
_ = @atomicRmw(usize, &waiter.bucket.waiters, .Add, 1, .SeqCst); | |
return waiter; | |
} | |
pub fn wake(address: usize, max_waiters: usize) void { | |
var awoken = Queue{}; | |
defer while (awoken.pop()) |node| { | |
getLoop().push(&node.data.task); | |
}; | |
const bucket = Bucket.from(address); | |
if (@atomicLoad(usize, &bucket.waiters, .SeqCst) == 0) { | |
return; | |
} | |
const held = bucket.mutex.acquire(); | |
defer held.release(); | |
var woke_up: usize = 0; | |
defer if (woke_up > 0) { | |
_ = @atomicRmw(usize, &bucket.waiters, .Sub, woke_up, .SeqCst); | |
}; | |
var current_node = bucket.queue.first; | |
while (current_node) |node| { | |
current_node = node.next; | |
if (node.data.address != address) { | |
continue; | |
} | |
bucket.queue.remove(node); | |
awoken.append(node); | |
woke_up += 1; | |
if (woke_up >= max_waiters) { | |
break; | |
} | |
} | |
} | |
}; | |
}; | |
pub fn main() !void { | |
try testHarness(testSendRecv); | |
} | |
fn testHarness(comptime asyncFn: anytype) !@typeInfo(@TypeOf(asyncFn)).Fn.return_type.? { | |
var gpa = std.heap.GeneralPurposeAllocator(.{}){}; | |
var heap = if (std.builtin.os.tag == .windows) std.heap.HeapAllocator.init() else {}; | |
const allocator = if (std.builtin.link_libc) | |
std.heap.c_allocator | |
else if (std.builtin.os.tag == .windows) | |
&heap.allocator | |
else | |
&gpa.allocator; | |
const loop = Loop.instance.?; | |
try loop.init(allocator); | |
defer loop.deinit(); | |
const Wrapper = struct { | |
fn entry(l: *Loop) @typeInfo(@TypeOf(asyncFn)).Fn.return_type.? { | |
defer l.shutdown(); | |
return asyncFn(); | |
} | |
}; | |
var frame = async Wrapper.entry(loop); | |
loop.run(); | |
return nosuspend await frame; | |
} | |
fn testSendRecv() !void { | |
const ITEMS = 1024; | |
const PRODUCE = 1_000_000; | |
const CONSUME = PRODUCE; | |
const Consumer = struct { | |
fn run(ch: *sync.Channel(u8), wg: *sync.WaitGroup) void { | |
sync.yield(); | |
defer wg.done(); | |
var items: usize = CONSUME; | |
while (items > 0) : (items -= 1) { | |
// std.debug.warn("reading {}\n", .{items}); | |
_ = ch.readItem(); | |
} | |
} | |
}; | |
const allocator = std.heap.page_allocator; | |
const buffer = try allocator.alloc(u8, ITEMS); | |
defer allocator.free(buffer); | |
var ch = sync.Channel(u8).init(buffer); | |
var wg = sync.WaitGroup.init(1); | |
defer wg.wait(); | |
var consumer = async Consumer.run(&ch, &wg); | |
defer await consumer; | |
var items: usize = PRODUCE; | |
while (items > 0) : (items -= 1) { | |
// std.debug.warn("writing {}\n", .{items}); | |
ch.writeItem(0); | |
} | |
// ch := make(chan byte, 4096) | |
// wg := sync.WaitGroup{} | |
// wg.Add(1) | |
// go func() { | |
// defer wg.Done() | |
// for range ch { | |
// } | |
// }() | |
// b.SetBytes(1) | |
// b.ReportAllocs() | |
// b.ResetTimer() | |
// for i := 0; i < b.N; i++ { | |
// ch <- byte(i) | |
// } | |
// close(ch) | |
// wg.Wait() | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
pub const Scheduler = struct { | |
counter: usize = 0, | |
max_workers: usize, | |
main_worker: ?*Worker = null, | |
thread_config: System.ThreadConfig, | |
idle_lock: System.Lock = .{}, | |
idle_workers: ?*Worker = null, | |
spawned_workers: ?*Worker = null, | |
run_queues: Task.Priority.Array(Node.Queue) = Task.Priority.arrayInit(Node.Queue), | |
pub const Config = struct { | |
max_threads: usize, | |
stack_size: usize, | |
}; | |
pub fn init(config: Config) Scheduler { | |
return Scheduler{ | |
.max_workers = if (std.builtin.single_threaded) 1 else std.math.max(1, config.max_threads), | |
.thread_config = .{ | |
.stack_size = std.math.max(std.mem.page_size, config.stack_size), | |
}, | |
}; | |
} | |
pub fn run(self: *Scheduler) void { | |
self.notify(true); | |
} | |
pub const Task = struct { | |
node: Node = undefined, | |
callback: Callback, | |
pub const Callback = fn(*Worker, *Task) callconv(.C) void; | |
pub const Affinity = struct { | |
priority: Priority = .Normal, | |
worker: ?*Worker = null, | |
}; | |
pub const Priority = enum(u2) { | |
Low = 0, | |
Normal = 1, | |
High = 2, | |
Handoff = 3, | |
fn Array(comptime T: type) type { | |
return [3]T; | |
} | |
fn arrayInit(comptime T: type) Array(T) { | |
return [_]T{.{}} ** 3; | |
} | |
fn toArrayIndex(self: Priority) usize { | |
return switch (self) { | |
.Handoff, .High => 2, | |
.Normal => 1, | |
.Low => 0, | |
}; | |
} | |
}; | |
pub fn init(callback: Callback) Task { | |
return Task{ .data = callback }; | |
} | |
pub const Group = struct { | |
batch: Node.Batch = .{}, | |
pub fn from(task: *Task) Group { | |
return Group{ .batch = Node.Batch.from(&task.node) }; | |
} | |
pub fn isEmpty(self: Group) bool { | |
return self.batch.isEmpty(); | |
} | |
pub fn append(self: *Group, group: Group) void { | |
self.batch.push(group.batch, .fifo); | |
} | |
pub fn prepend(self: *Group, group: Group) void { | |
self.batch.push(group.batch, .lifo); | |
} | |
pub fn popFirst(self: *Group) ?*Task { | |
const node = self.batch.pop() orelse return null; | |
return @fieldParentPtr(Task, "node", node); | |
} | |
pub fn iter(self: Group) Iter { | |
return Iter{ .node = self.batch.head }; | |
} | |
pub const Iter = struct { | |
node: ?*Node, | |
pub fn next(self: *Iter) ?*Task { | |
const node = self.node orelse return null; | |
self.node = node.next; | |
return @fieldParentPtr(Task, "node", node); | |
} | |
}; | |
}; | |
}; | |
pub fn schedule(self: *Scheduler, group: Task.Group, affinity: Task.Affinity) void { | |
if (group.isEmpty()) { | |
return; | |
} | |
if (affinity.worker) |worker| { | |
worker.run_owned.push(group); | |
worker.notify(); | |
return; | |
} | |
self.run_queues[affinity.priority.toArrayIndex()].push(group); | |
self.notify(false); | |
} | |
const Counter = struct { | |
state: State = .uninit, | |
idle: usize = 0, | |
spawned: usize = 0, | |
const Count = std.meta.Int(.unsigned, @divFloor(std.meta.bitCount(usize) - 2, 2)); | |
const State = enum(u2) { | |
uninit = 0, | |
pending, | |
notified, | |
shutdown, | |
}; | |
fn pack(self: Counter) usize { | |
var value: usize = 0; | |
value |= @enumToInt(self.state); | |
value |= @as(usize, @intCast(Count, self.idle)) << 2; | |
value |= @as(usize, @intCast(Count, self.spawned)) << (2 + std.meta.bitCount(Count)); | |
return value; | |
} | |
fn unpack(value: usize) Counter { | |
return Counter{ | |
.state = @intToEnum(State, @truncate(std.meta.Tag(State), value)), | |
.idle = @truncate(Count, value >> 2), | |
.spawned = @truncate(Count, value >> (2 + std.meta.bitCount(Count))), | |
}; | |
} | |
}; | |
pub fn shutdown(self: *Scheduler) void { | |
var counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic)); | |
while (true) { | |
if (counter.state == .uninit) { | |
return; | |
} | |
if (counter.state == .shutdown) { | |
return; | |
} | |
var new_counter = counter; | |
new_counter.state = .shutdown; | |
new_counter.idle = 0; | |
if (@cmpxchgWeak( | |
usize, | |
&self.counter, | |
counter.pack(), | |
new_counter.pack(), | |
.AcqRel, | |
.Monotonic, | |
)) |updated| { | |
spinLoopHint(); | |
counter = Counter.unpack(updated); | |
continue; | |
} | |
if (counter.idle > 0) { | |
self.idleNotify(.all); | |
} | |
return; | |
} | |
} | |
fn markShutdown(self: *Scheduler) void { | |
var counter = Counter{ .spawned = 1 }; | |
counter = Counter.unpack(@atomicRmw(usize, &self.counter, .Sub, counter.pack(), .AcqRel)); | |
if (counter.spawned == 1 and counter.state == .shutdown) { | |
const main_worker = self.main_worker orelse unreachable; | |
main_worker.stop(); | |
} | |
} | |
fn markSpawned(self: *Scheduler) bool { | |
var counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic)); | |
while (true) { | |
switch (counter.state) { | |
.uninit => unreachable, | |
.pending => return true, | |
.notified => {}, | |
.shutdown => { | |
self.markShutdown(); | |
return false; | |
}, | |
} | |
var new_counter = counter; | |
new_counter.state = .pending; | |
if (@cmpxchgWeak( | |
usize, | |
&self.counter, | |
counter.pack(), | |
new_counter.unpack(), | |
.AcqRel, | |
.Monotonic, | |
)) |updated| { | |
spinLoopHint(); | |
counter = Counter.unpack(updated); | |
continue; | |
} | |
return true; | |
} | |
} | |
fn notify(self: *Scheduler, initialize: bool) void { | |
var counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic)); | |
while (true) { | |
const can_notify = switch (counter.state) { | |
.uninit => initialize, | |
.pending => true, | |
.notified => false, | |
.shutdown => false, | |
}; | |
if (!can_notify) { | |
return; | |
} | |
var new_counter = counter; | |
new_counter.state = .notified; | |
if (counter.idle == 0 and (counter.spawned < self.max_workers)) { | |
new_counter.spawned += 1; | |
} | |
if (@cmpxchgWeak( | |
usize, | |
&self.counter, | |
counter.pack(), | |
new_counter.pack(), | |
.AcqRel, | |
.Monotonic, | |
)) |updated| { | |
counter = Counter.unpack(updated); | |
continue; | |
} | |
if (counter.idle > 0) { | |
self.idleNotify(.one); | |
return; | |
} | |
if (counter.spawned < new_counter.spawned) { | |
if (initialize or std.builtin.single_threaded) { | |
Worker.run(self); | |
return; | |
} | |
if (!Worker.spawn(self)) { | |
self.markShutdown(); | |
} | |
} | |
return; | |
} | |
} | |
fn wait(self: *Scheduler, worker: *Worker) error{Shutdown}!bool { | |
var is_idle = false; | |
var has_local = false; | |
var counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic)); | |
while (true) { | |
std.debug.assert(counter.state != .uninit); | |
if (counter.state == .shutdown) { | |
self.markShutdown(); | |
return error.Shutdown; | |
} | |
if (counter.state == .notified or !is_idle or has_local) { | |
var new_counter = counter; | |
counter.state = .pending; | |
if (!is_idle) { | |
counter.idle += 1; | |
} else if (has_local) { | |
counter.idle -= 1; | |
} | |
if (@cmpxchgWeak( | |
usize, | |
&self.counter, | |
counter.pack(), | |
new_counter.pack(), | |
.AcqRel, | |
.Monotonic, | |
)) |updated| { | |
spinLoopHint(); | |
counter = Counter.unpack(updated); | |
continue; | |
} | |
if (counter.state == .notified) { | |
return true; | |
} else if (has_local) { | |
return false; | |
} else { | |
is_idle = true; | |
} | |
} | |
has_local = self.idleWait(worker); | |
counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic)); | |
} | |
} | |
fn idleWait(self: *Scheduler, worker: *Worker) bool { | |
{ | |
self.idle_lock.acquire(); | |
defer self.idle_lock.release(); | |
@atomicStore(bool, &worker.idle_pending, true, .Monotonic); | |
worker.idle_prev = null; | |
worker.idle_next = self.idle_workers; | |
self.idle_workers = worker; | |
} | |
const has_local = worker.wait(); | |
self.idleCancel(worker); | |
return has_local; | |
} | |
fn idleCancel(self: *Scheduler, worker: *Worker) void { | |
if (@atomicLoad(bool, &worker.idle_pending, .Monotonic)) { | |
self.idle_lock.acquire(); | |
defer self.idle_lock.release(); | |
if (@atomicLoad(bool, &worker.idle_pending, .Monotonic)) { | |
@atomicStore(bool, &worker.idle_pending, false, .Monotonic); | |
if (worker.idle_next) |next| { | |
next.idle_prev = worker.idle_prev; | |
} | |
if (worker.idle_prev) |prev| { | |
prev.idle_next = worker.idle_next; | |
} | |
if (worker == self.idle_workers) { | |
self.idle_workers = worker.idle_next; | |
} | |
} | |
} | |
} | |
fn idleNotify(self: *Scheduler, scope: enum{ one, all }) void { | |
var idle_workers: ?*Worker = null; | |
defer while (idle_workers) |worker| { | |
idle_workers = worker.idle_next; | |
worker.notify(); | |
}; | |
self.idle_lock.acquire(); | |
defer self.idle_lock.release(); | |
var max_wake: usize = switch (scope) { | |
.one => 1, | |
.all => std.math.maxInt(usize), | |
}; | |
while (max_wake > 0) : (max_wake -= 1) { | |
const worker = self.idle_workers orelse break; | |
self.idle_workers = worker.idle_next; | |
if (self.idle_workers) |next_worker| { | |
next_worker.idle_prev = null; | |
} | |
@atomicStore(bool, &worker.idle_pending, false, .Monotonic); | |
worker.idle_next = idle_workers; | |
idle_workers = worker; | |
} | |
} | |
pub const Worker = struct { | |
scheduler: *Scheduler, | |
thread: ?System.Thread, | |
timer: Timer = .{}, | |
join_ptr: usize = 0, | |
idle_ptr: usize = 0, | |
idle_pending: bool = false, | |
idle_prev: ?*Worker = undefined, | |
idle_next: ?*Worker = undefined, | |
spawned_prev: ?*Worker = undefined, | |
spawned_next: ?*Worker = undefined, | |
steal_target: ?*Worker = null, | |
run_next: Node.Batch = .{}, | |
run_owned: Node.Queue = .{}, | |
run_queues: Task.Priority.Array(Node.Queue) = Task.Priority.arrayInit(Node.Queue), | |
fn spawn(scheduler: *Scheduler) bool { | |
const Spawner = struct { | |
put_event: System.Event, | |
got_event: System.EVent, | |
sched: *Scheduler, | |
thread: System.Thread, | |
fn runWorker(self_ptr: usize) void { | |
const self = @intToPtr(*Spawner, self_ptr); | |
self.put_event.wait(null) catch unreachable; | |
const sched = self.sched; | |
const thread = self.thread; | |
self.got_event.set(); | |
Worker.run(sched, thread); | |
} | |
}; | |
var spawner: Spawner = .{}; | |
spawner.put_event.init(); | |
spawner.got_event.init(); | |
defer { | |
spawner.put_event.deinit(); | |
spawner.got_event.deinit(); | |
} | |
spawner.sched = scheduler; | |
spawner.thread = System.Thread.spawn( | |
scheduler.thread_config, | |
Spawner.runWorker, | |
@ptrToInt(&spawner), | |
) catch return false; | |
spawner.put_event.set(); | |
spawner.got_event.wait(null) catch unreachable; | |
return true; | |
} | |
fn run(scheduler: *Scheduler, thread: ?System.Thread) Worker { | |
var self = Worker{ | |
.scheduler = scheduler, | |
.thread = thread, | |
}; | |
var spawned_workers = @atomicLoad(?*Worker, &scheduler.spawned_workers, .Monotonic); | |
while (true) { | |
self.spawned_prev = null; | |
self.spawned_next = spawned_workers; | |
spawned_workers = @cmpxchgWeak( | |
?*Worker, | |
&scheduler.spawned_workers, | |
spawned_workers, | |
&self, | |
.Release, | |
.Monotonic, | |
) orelse break; | |
} | |
if (spawned_workers) |next_worker| { | |
next_worker.spawned_prev = &self; | |
} else { | |
scheduler.main_worker = &self; | |
} | |
defer { | |
self.join(); | |
if (@atomicLoad(?*Worker, &self.spawned_prev, .Monotonic)) |prev_worker| { | |
prev_worker.stop(); | |
} | |
} | |
var was_idle = true; | |
if (!scheduler.markSpawned()) { | |
return; | |
} | |
var run_tick: u8 = 0; | |
while (true) { | |
@atomicStore(usize, &self.idle_ptr, 0, .Monotonic); | |
if (self.poll(run_tick == 127)) |node| { | |
if (was_idle) scheduler.notify(false); | |
was_idle = true; | |
run_tick +%= 1; | |
const task = @fieldParentPtr(Task, "node", node); | |
(task.callback)(task, &self); | |
continue; | |
} | |
was_idle = true; | |
self.idleWait(&self) catch break; | |
} | |
} | |
pub fn getScheduler(self: Worker) *Scheduler { | |
return self.scheduler; | |
} | |
pub fn schedule(self: *Worker, group: Task.Group, affinity: Task.Affinity) void { | |
if (group.isEmpty()) { | |
return; | |
} | |
if (affinity.worker) |worker| { | |
if (worker == self) { | |
self.run_next.push(group.batch, .fifo); | |
return; | |
} | |
worker.run_owned.push(group.batch); | |
worker.notify(); | |
return; | |
} | |
if (affinity.priority == .Handoff) { | |
self.run_next.push(group.batch, .lifo); | |
return; | |
} | |
self.run_queues[affinity.priority.toArrayIndex()].push(group.batch); | |
self.getScheduler().notify(false); | |
} | |
fn poll(self: *Worker, be_fair: bool) ?*Node { | |
const scheduler = self.getScheduler(); | |
if (be_fair) { | |
if (self.pollEvents()) |node| return node; | |
if (self.pollQueues(&scheduler.run_queues, true)) |node| return node; | |
if (self.pollQueues(&self.run_queues, true)) |node| return node; | |
} | |
if (self.pollLocal(be_fair)) |node| { | |
return node; | |
} | |
var attempts: u8 = 4; | |
while (attempts != 0) : (attempts -= 1) { | |
if (self.pollQueues(&self.run_queues, false)) |node| { | |
return node; | |
} | |
if (self.pollQueues(&scheduler.run_queues, false)) |node| { | |
return node; | |
} | |
var iter = Counter.unpack(@atomicLoad(usize, &scheduler.counter, .Monotonic)).spawned; | |
while (iter != 0) : (iter -= 1) { | |
const target = self.steal_target | |
orelse @atomicLoad(?*Worker, &scheduler.spawned_workers, .Acquire) | |
orelse unreachable; | |
if (target != self) { | |
const be_aggresive = attempts <= (steal_attempts / 2); | |
if (self.pollSteal(target, be_aggresive)) |node| { | |
self.steal_target = target; | |
return node; | |
} | |
} | |
self.steal_target = target.spawned_next; | |
} | |
} | |
return self.pollEvents(); | |
} | |
fn pollNext(self: *Worker) ?*Node { | |
return self.run_next.pop(); | |
} | |
fn pollOwned(self: *Worker) ?*Node { | |
return self.run_owned.popAssumeOwned(); | |
} | |
fn pollLocal(self: *Worker, be_fair: bool) ?*Node { | |
const PollType = enum{ next, owned, timers }; | |
var poll_order = [_]PollType{ .next, .owned, .timers }; | |
if (be_fair) { | |
poll_order = [_]PollType{ .timers, .owned, .next }; | |
} | |
for (poll_types) |poll_type| { | |
if (switch (poll_type) { | |
.next => self.pollNext(), | |
.owned => self.pollOwned(), | |
.timers => self.pollTimers(self), | |
}) |node| { | |
return node; | |
} | |
} | |
return null; | |
} | |
fn pollSteal(self: *Worker, target: *Worker, be_aggresive: bool) ?*Node { | |
if (self.pollQueues(&target.run_queues, false)) |node| { | |
return node; | |
} | |
if (be_aggresive) { | |
if (self.pollTimers(target)) |node| { | |
return node; | |
} | |
} | |
return null; | |
} | |
fn pollQueues(self: *Worker, queues: *Task.Priority.Array(Node.Queue), be_fair: bool) ?*Node { | |
var poll_order = [_]Task.Priority{ .High, .Normal, .Low }; | |
if (be_fair) { | |
poll_order = [_]Task.Priority{ .Low, .Normal, .High }; | |
} | |
for (poll_order) |priority| { | |
if (queues[priority.toArrayIndex()].pop()) |node| { | |
return node; | |
} | |
} | |
return null; | |
} | |
fn pollTimers(self: *Worker, target: *Worker) ?*Node { | |
var group = switch (target.timer.poll()) { | |
.expired => |group| group, | |
else => return null, | |
}; | |
const first_node = group.pop(); | |
self.run_queues[Task.Priority.High.toArrayIndex()].push(group); | |
return first_node; | |
} | |
fn pollEvents(self: *Worker) ?*Node { | |
// TODO: | |
} | |
fn wait(self: *Worker) bool { | |
@compileError("TODO"); | |
} | |
fn notify(self: *Worker) void { | |
@compileError("TODO"); | |
} | |
fn join(self: *Worker) void { | |
var event: System.Event = undefined; | |
var has_event = false; | |
defer if (has_event) { | |
event.deinit(); | |
} | |
var join_ptr = @atomicLoad(usize, &self.join_ptr, .Acquire); | |
while (true) { | |
if (join_ptr == 1) { | |
return; | |
} | |
if (!has_event) { | |
has_event = true; | |
event.init(); | |
} | |
join_ptr = @cmpxchgWeak( | |
usize, | |
&self.join_ptr, | |
join_ptr, | |
@ptrToInt(&event), | |
.AcqRel, | |
.Acquire, | |
) orelse { | |
event.wait(null) catch unreachable; | |
return; | |
}; | |
} | |
} | |
fn stop(self: *Worker) void { | |
const join_ptr = @atomicRmw(usize, &self.join_ptr, .Xchg, 1, .AcqRel); | |
if (@intToPtr(?*System.Event, join_ptr)) |event| { | |
event.set(); | |
} | |
} | |
}; | |
const Timer = struct { | |
tree_lock: System.Lock = System.Lock{}, | |
has_pending: bool = false, | |
tree: Tree = .{}, | |
const Tree = Treap(struct { | |
deadline: System.Instant, | |
pub const use_min = true; | |
const Key = @This(); | |
pub fn isLessThan(left: Key, right: Key) bool { | |
if (isEqual(left, right)) return false; | |
return right.deadline.elapsed(left.deadline) != null; | |
} | |
pub fn isEqual(left: Key, right: Key) bool { | |
return right.deadline.isEqual(left.deadline); | |
} | |
pub fn isMoreImportant(left: Key, right: Key) bool { | |
return !isLessThan(left, right); | |
} | |
}); | |
pub const Node = struct { | |
tree_node: Tree.Node, | |
is_pending: bool, | |
group: Task.Group, | |
timer: *Timer, | |
prev: ?*Node, | |
next: ?*Node, | |
tail: *Node, | |
}; | |
pub fn schedule(self: *Timer, node: *Node, deadline: System.Instant, group: Task.Group) void { | |
self.tree_lock.acquire(); | |
defer self.tree_lock.release(); | |
node.prev = null; | |
node.next = null; | |
node.tail = node; | |
node.timer = self; | |
node.group = group; | |
@atomicStore(bool, &node.is_pending, true, .Monotonic); | |
@atomicStore(bool, &self.has_pending, true, .Monotonic); | |
const deadline_ms = deadline.round(std.time.ns_per_ms); | |
const key = Tree.Key{ .deadline = deadline_ms }; | |
const lookup = self.tree.find(key); | |
const head_node = lookup.node orelse { | |
self.tree.insert(lookup, &node.tree_node, key); | |
return; | |
}; | |
const head = @fieldParentPtr(Node, "tree_node", head_node); | |
node.tree_node = head.tree_node; | |
node.prev = head.tail; | |
head.tail.next = node; | |
head.tail = node; | |
} | |
pub fn tryCancel(self: *Timer, node: *Node) bool { | |
if (!@atomicLoad(bool, &node.is_pending, .Monotonic)) { | |
return false; | |
} | |
self.tree_lock.acquire(); | |
defer self.tree_lock.release(); | |
if (!@atomicLoad(bool, &node.is_pending, .Monotonic)) { | |
return false; | |
} | |
if (node.prev) |prev| { | |
prev.next = node.next; | |
if (node.next) |next| { | |
next.prev = prev; | |
} else { | |
const lookup = self.tree.find(node.tree_node.key); | |
const tree_node = lookup.node orelse unreachable; | |
const head = @fieldParentPtr(Node, "tree_node", tree_node); | |
std.debug.assert(head.tail == node); | |
head.tail = prev; | |
} | |
} else if (node.next) |next| { | |
self.tree.replace(&node.tree_node, &next.tree_node); | |
next.tail = node.tail; | |
next.prev = null; | |
} else { | |
self.tree.remove(&node.tree_node); | |
} | |
@atomicStore(bool, &node.is_pending, false, .Monotonic); | |
if (self.tree.peekMin() == null) { | |
@atomicStore(bool, &self.has_pending, false, .Monotonic); | |
} | |
return true; | |
} | |
pub const Poll = union(enum){ | |
empty, | |
wait: u64, | |
expired: Task.Group, | |
}; | |
pub fn poll(self: *Timer) Poll { | |
if (!@atomicLoad(bool, &self.has_pending, .Monotonic)) { | |
return .empty; | |
} | |
if (!self.tree_lock.tryAcquire()) return .empty; | |
defer self.tree_lock.release(); | |
if (!@atomicLoad(bool, &self.has_pending, .Monotonic)) { | |
return .empty; | |
} | |
var expired = Task.Group{}; | |
var next_delay: ?u64 = null; | |
var timestamp: ?System.Instant = null; | |
while (true) { | |
const tree_node = self.tree.peekMin() orelse { | |
@atomicStore(bool, &self.has_pending, false, .Monotonic); | |
break; | |
}; | |
const now = timestamp orelse blk: { | |
timestamp = System.Instant.now(); | |
break :blk timestamp orelse unreachable; | |
}; | |
if (now.elapsed(tree_node.key.deadline)) |expired_since| { | |
self.tree.remove(tree_node); | |
} else { | |
next_delay = tree_node.key.deadline.elapsed(now); | |
break; | |
} | |
var nodes: ?*Node = @fieldParentPtr(Node, "tree_node", tree_node); | |
while (nodes) |node| { | |
nodes = node.next; | |
@atomicStore(bool, &node.is_pending, false, .Monotonic); | |
expired.append(node.group); | |
} | |
} | |
if (!expired.isEmpty()) return Poll{ .expired = expired }; | |
if (next_delay) |delay| return Poll{ .wait = delay }; | |
return .empty; | |
} | |
}; | |
const Node = struct { | |
next: usize, | |
pub const Batch = struct { | |
head: ?*Node = null, | |
tail: *Node = undefined, | |
pub fn from(node: *Node) Batch { | |
node.next = 0; | |
return Batch{ | |
.head = node, | |
.tail = node, | |
}; | |
} | |
pub fn isEmpty(self: Batch) bool { | |
return self.head == null; | |
} | |
pub fn push(self: *Batch, batch: Batch, order: enum{ lifo, fifo }) void { | |
if (batch.isEmpty()) { | |
return; | |
} | |
if (self.isEmpty()) { | |
self.* = batch; | |
return; | |
} | |
switch (order) { | |
.lifo => { | |
batch.tail.next = self.head; | |
self.head = batch.head; | |
}, | |
.fifo => { | |
self.tail.next = batch.head; | |
self.tail = batch.tail; | |
}, | |
} | |
} | |
pub fn pop(self: *Batch) ?*Node { | |
const node = self.head orelse return null; | |
self.head = node.next; | |
return node; | |
} | |
}; | |
pub const Queue = struct { | |
head: Node = .{ .next = 0 }, | |
tail: ?*Node = null, | |
pub fn isEmpty(self: *const Queue) bool { | |
return @atomicLoad(usize, &self.head.next, .Monotonic) == 0; | |
} | |
pub fn push(self: *Queue, batch: Batch) void { | |
if (batch.isEmpty()) { | |
return; | |
} | |
const old_tail = @atomicRmw(?*Node, &self.tail, .Xchg, batch.tail, .AcqRel); | |
const tail = old_tail orelse &self.head; | |
@atomicStore(usize, &old_tail.next, @ptrToInt(batch.head), .Release); | |
} | |
pub fn popAssumeOwned(self: *Queue) ?*Node { | |
const next = @atomicLoad(usize, &self.head.next, .Monotonic); | |
if (next == 0) { | |
return null; | |
} | |
std.debug.assert(next & 1 == 0); | |
return self.popWithAcquired(@intToPtr(*Node, next)); | |
} | |
pub fn pop(self: *Queue) ?*Node { | |
var next = @atomicLoad(usize, &self.head.next, .Monotonic); | |
while (true) : (spinLoopHint()) { | |
if ((next == 0) or (next & 1 != 0)) { | |
return null; | |
} | |
next = @cmpxchgWeak( | |
usize, | |
&self.head.next, | |
next, | |
next | 1, | |
.Acquire, | |
.Monotonic, | |
) orelse return self.popWithAcquired(@intToPtr(*Node, next)); | |
} | |
} | |
fn popWithAcquired(self: *Queue, head: *Node) ?*Node { | |
var next = @atomicLoad(usize, &head.next, .Acquire); | |
if (next != 0) { | |
@atomicStore(usize, &self.head.next, next, .Monotonic); | |
return head; | |
} | |
@atomicStore(usize, &self.head.next, 0, .Monotonic); | |
if (@cmpxchgStrong(?*Node, &self.tail, head, &self.head, .AcqRel, .Acquire) == null) { | |
return null; | |
} | |
var spin: u8 = 10; | |
while (spin > 0) : (spin -= 1) { | |
next = @atomicLoad(usize, &head.next, .Acquire); | |
if (next != 0) { | |
@atomicStore(usize, &self.head.next, next, .Monotonic); | |
return head; | |
} | |
} | |
@atomicStore(usize, &self.head.next, @ptrToInt(head), .Monotonic); | |
return null; | |
} | |
}; | |
}; | |
}; | |
fn Treap(comptime KeyType: type) type { | |
return struct { | |
root: ?*Node = null, | |
min: @TypeOf(min_init) = min_init, | |
const Self = @This(); | |
pub const Key = KeyType; | |
const min_init = if (Key.use_min) @as(?*Node, null) else {}; | |
const Parent = struct { | |
node: *Node, | |
is_left: bool, | |
fn pack(parent: ?Parent) usize { | |
const self = parent orelse return 0; | |
return @ptrToInt(self.node) | @as(usize, @boolToInt(self.is_left)); | |
} | |
fn unpack(value: usize) ?Parent { | |
return Parent{ | |
.node = @intToPtr(?*Node, value & ~@as(usize, 0b1)) orelse return null, | |
.is_left = value & 1 != 0, | |
}; | |
} | |
}; | |
pub const Node = struct { | |
key: Key, | |
parent: usize, | |
children: [2]?*Node, | |
}; | |
pub const Lookup = struct { | |
node: ?*Node, | |
parent: ?Parent, | |
}; | |
pub fn find(self: *Self, key: Key) Lookup { | |
var lookup = Lookup{ | |
.node = self.root, | |
.parent = null, | |
}; | |
while (true) { | |
const node = lookup.node orelse break; | |
if (Key.isEqual(key, node.key)) | |
break; | |
const is_left = Key.isLessThan(key, node.key); | |
lookup = Lookup{ | |
.node = node.children[@boolToInt(!is_left)], | |
.parent = Parent{ | |
.node = node, | |
.is_left = is_left, | |
}, | |
}; | |
} | |
return lookup; | |
} | |
pub fn insert(self: *Self, lookup: Lookup, node: *Node, key: Key) void { | |
node.* = Node{ | |
.key = key, | |
.parent = Parent.pack(lookup.parent), | |
.children = [2]?*Node{ null, null }, | |
}; | |
if (Key.use_min) { | |
if (self.min) |min_node| { | |
if (Key.isLessThan(key, min_node.key)) { | |
self.min = node; | |
} | |
} else { | |
self.min = node; | |
} | |
} | |
if (lookup.parent) |parent| { | |
parent.node.children[@boolToInt(!parent.is_left)] = node; | |
} else { | |
self.root = node; | |
} | |
self.fixupInsert(node); | |
} | |
pub fn remove(self: *Self, node: *Node) void { | |
if (Key.use_min and self.min == node) { | |
if (new_node) |new| { | |
self.min = new; | |
} else { | |
self.min = self.findNextMin(node); | |
} | |
} | |
self.fixupRemove(node); | |
if (Parent.unpack(node.parent)) |parent| { | |
parent.node.children[@boolToInt(!parent.is_left)] = new_node; | |
} else { | |
self.root = new_node; | |
} | |
} | |
fn fixupInsert(self: *Self, node: *Node) void { | |
while (true) { | |
const parent = Parent.unpack(node.parent) orelse break; | |
if (!Key.isMoreImportant(parent.node.key, node.key)) | |
break; | |
const children = &parent.node.children; | |
std.debug.assert(node == children[0] or node == children[1]); | |
self.rotate(parent.node, node == children[1]); | |
} | |
} | |
fn fixupRemove(self: *Self, node: *Node) void { | |
while (true) { | |
if (node.children[0] == null and node.children[1] == null) | |
break; | |
self.rotate(node, blk: { | |
const right = node.children[1] orelse break :blk false; | |
const left = node.children[0] orelse break :blk true; | |
break :blk !Key.isMoreImportant(left.key, right.key); | |
}); | |
} | |
} | |
fn rotate(self: *Self, node: *Node, is_left: bool) void { | |
std.debug.assert(self.root != null); | |
std.debug.assert(node.children[@boolToInt(is_left)] != null); | |
const parent = Parent.unpack(node.parent); | |
const swap_node = node.children[@boolToInt(is_left)] orelse unreachable; | |
const child_node = swap_node.children[@boolToInt(!is_left)]; | |
swap_node.children[@boolToInt(!is_left)] = node; | |
node.parent = Parent.pack(Parent{ | |
.node = swap_node, | |
.is_left = is_left, | |
}); | |
node.children[@boolToInt(is_left)] = child_node; | |
if (child_node) |child| { | |
child.parent = Parent.pack(Parent{ | |
.node = node, | |
.is_left = !is_left, | |
}); | |
} | |
swap_node.parent = parent; | |
if (parent) |p| { | |
const children = &p.node.children; | |
std.debug.assert(node == children[0] or node == children[1]); | |
children[@boolToInt(node == children[1])] = swap_node; | |
} else { | |
self.root = swap_node; | |
} | |
} | |
pub fn replace(self: *Self, node: *Node, new_node: ?*Node) void { | |
if (new_node) |new| | |
new.* = node.*; | |
if (Key.use_min and self.min == node) { | |
if (new_node) |new| { | |
self.min = new; | |
} else { | |
self.min = self.findNextMin(node); | |
} | |
} | |
if (Parent.unpack(node.parent)) |parent| { | |
parent.node.children[@boolToInt(!parent.is_left)] = new_node; | |
} else { | |
self.root = new_node; | |
} | |
for (node.children) |*child_node, index| { | |
const child = child_node orelse continue; | |
child.parent = Parent.pack(blk: { | |
break :blk Parent{ | |
.node = new_node orelse break :blk null, | |
.is_left = index == 0, | |
}; | |
}); | |
} | |
} | |
fn findNextMin(self: *Self, node: *Node) ?*Node { | |
std.debug.assert(self.min == node); | |
std.debug.assert(node.children[0] == null); | |
var next_min = blk: { | |
if (node.children[1]) |right| | |
break :blk right; | |
const parent = Parent.unpack(node.parent) orelse return null; | |
std.debug.assert(node == parent.node.children[0]); | |
break :blk parent.node.children[1] orelse parent.node; | |
}; | |
while (next_min.children[0]) |left_node| { | |
if (left_node == node) | |
break; | |
next_min = left_node; | |
} | |
return next_min; | |
} | |
pub fn peekMin(self: *Self) ?*Node { | |
if (!Key.use_min) | |
@compileError("Treap not configured for priority operations"); | |
return self.min; | |
} | |
}; | |
} | |
const System = struct { | |
const is_linux = std.builtin.os.tag == .linux; | |
const is_windows = std.builtin.os.tag == .windows; | |
const is_darwin = switch (std.builtin.os.tag) { | |
.macos, .ios, .tvos, .watchos => true, | |
else => false, | |
}; | |
const is_posix = std.builtin.link_libc and (is_linux or is_bsd); | |
const is_bsd = is_linux or is_darwin or switch (std.builtin.os.tag) { | |
.freebsd, .kfreebsd, .openbsd, .netbsd, .dragonfly => true, | |
else => false, | |
}; | |
pub fn spinLoopHint() void { | |
const hint_asm: []const u8 = switch (std.builtin.arch) { | |
.i386, .x86_64 => "pause", | |
.aarch64 => "yield", | |
else => return, | |
}; | |
asm volatile(hint_asm ::: "memory"); | |
} | |
// Lock | |
pub usingnamespace struct { | |
pub const Lock = if (is_windows) | |
WindowsLock | |
else if (is_darwin) | |
DarwinLock | |
else if (is_linux) | |
LinuxLock | |
else if (is_posix) | |
PosixLock | |
else | |
@compileError("System unsupported"); | |
const WindowsLock = struct { | |
srwlock: std.os.windows.SRWLOCK = std.os.windows.SRWLOCK_INIT, | |
pub fn tryAcquire(self: *WindowsLock) bool { | |
const status = std.os.windows.kernel32.TryAcquireSRWLockExclusive(&self.srwlock); | |
return status != std.os.windows.FALSE; | |
} | |
pub fn acquire(self: *WindowsLock) void { | |
std.os.windows.kernel32.AcquireSRWLockExclusive(&self.srwlock); | |
} | |
pub fn release(self: *WindowsLock) void { | |
std.os.windows.kernel32.ReleaseSRWLockExclusive(&self.srwlock); | |
} | |
}; | |
const DarwinLock = extern struct { | |
os_unfair_lock: u32 = 0, | |
extern fn os_unfair_lock_trylock(lock: *DarwinLock) bool; | |
extern fn os_unfair_lock_lock(lock: *DarwinLock) void; | |
extern fn os_unfair_lock_unlock(lock: *DarwinLock) void; | |
pub fn tryAcquire(self: *DarwinLock) bool { | |
return os_unfair_lock_trylock(&self.os_unfair_lock); | |
} | |
pub fn acquire(self: *DarwinLock) void { | |
os_unfair_lock_lock(&self.os_unfair_lock); | |
} | |
pub fn release(self: *DarwinLock) void { | |
os_unfair_lock_unlock(&self.os_unfair_lock); | |
} | |
}; | |
const LinuxLock = struct { | |
state: State = .unlocked, | |
const State = enum(i32) { | |
unlocked = 0, | |
locked, | |
contended, | |
}; | |
pub fn tryAcquire(self: *LinuxLock) bool { | |
return @cmpxchgStrong( | |
State, | |
&self.state, | |
.unlocked, | |
.locked, | |
.Acquire, | |
.Monotonic, | |
) == null; | |
} | |
pub fn acquire(self: *LinuxLock) void { | |
if (@cmpxchgWeak( | |
State, | |
&self.state, | |
.unlocked, | |
.locked, | |
.Acquire, | |
.Monotonic, | |
)) |_| { | |
self.acquireSlow(); | |
} | |
} | |
fn acquireSlow(self: *LinuxLock) void { | |
@setCold(true); | |
var spin: u8 = 10; | |
var new_state = State.locked; | |
var state = @atomicLoad(State, &self.state, .Monotonic); | |
while (true) { | |
if (state == .unlocked) { | |
state = @cmpxchgWeak(State, &self.state, state, new_state, .Acquire, .Monotonic) orelse return; | |
spinLoopHint(); | |
continue; | |
} | |
if (state == .locked and spin > 0) { | |
spin -= 1; | |
spinLoopHint(); | |
state = @atomicLoad(State, &self.state, .Monotonic); | |
continue; | |
} | |
if (state != .contended) { | |
if (@cmpxchgWeak(State, &self.state, state, .contended, .Monotonic, .Monotonic)) |updated| { | |
spinLoopHint(); | |
state = updated; | |
continue; | |
} | |
} | |
switch (std.os.linux.getErrno(std.os.linux.futex_wait( | |
@ptrCast(*const i32, &self.state), | |
std.os.linux.FUTEX_PRIVATE_FLAG | std.os.linux.FUTEX_WAIT, | |
@enumToInt(State.contended), | |
null, | |
))) { | |
0 => {}, | |
std.os.EINTR => {}, | |
std.os.EAGAIN => {}, | |
std.os.ETIMEDOUT => unreachable, | |
else => unreachable, | |
} | |
spin = 5; | |
new_state = .contended; | |
state = @atomicLoad(State, &self.state, .Monotonic); | |
} | |
} | |
pub fn release(self: *LinuxLock) void { | |
switch (@atomicRmw(State, &self.state, .Xchg, .unlocked, .Release)) { | |
.unlocked => unreachable, | |
.locked => {}, | |
.contended => self.releaseSlow(), | |
} | |
} | |
fn releaseSlow(self: *LinuxLock) void { | |
@setCold(true); | |
switch (std.os.linux.getErrno(std.os.linux.futex_wake( | |
@ptrCast(*const i32, &self.state), | |
std.os.linux.FUTEX_PRIVATE_FLAG | std.os.linux.FUTEX_WAKE, | |
1, | |
))) { | |
0 => {}, | |
std.os.EINVAL => {}, | |
std.os.EACCES => {}, | |
std.os.EFAULT => {}, | |
else => unreachable, | |
} | |
} | |
}; | |
const PosixLock = struct { | |
state: usize = UNLOCKED, | |
const UNLOCKED = 0; | |
const LOCKED: usize = 1 << 0; | |
const WAKING: usize = 1 << 1; | |
const WAITING = ~(LOCKED | WAKING); | |
const Waiter = struct { | |
prev: ?*Waiter, | |
next: ?*Waiter, | |
tail: ?*Waiter, | |
event: Event align(std.math.max(~WAITING + 1, @alignOf(Event))), | |
}; | |
pub fn tryAcquire(self: *PosixLock) bool { | |
var state = @atomicLoad(usize, &self.state, .Monotonic); | |
while (true) { | |
if (state & LOCKED != 0) { | |
return false; | |
} | |
state = @cmpxchgWeak( | |
usize, | |
&self.state, | |
state, | |
state | LOCKED, | |
.Acquire, | |
.Monotonic, | |
) orelse return true; | |
} | |
} | |
pub fn acquire(self: *PosixLock) void { | |
if (@cmpxchgWeak( | |
usize, | |
&self.state, | |
UNLOCKED, | |
LOCKED, | |
.Acquire, | |
.Monotonic, | |
)) |_| { | |
self.acquireSlow(); | |
} | |
} | |
fn acquireSlow(self: *PosixLock) void { | |
@setCold(true); | |
var spin: u8 = 10; | |
var has_event = false; | |
var waiter: Waiter = undefined; | |
var state = @atomicLoad(usize, &self.state, .Monotonic); | |
while (true) { | |
if (state & LOCKED == 0) { | |
state = @cmpxchgWeak(usize, &self.state, state, state | LOCKED, .Acquire, .Monotonic) orelse break; | |
spinLoopHint(); | |
continue; | |
} | |
const head = @intToPtr(?*Waiter, state & WAITING); | |
if (head == null and spin > 0) { | |
spin -= 1; | |
spinLoopHint(); | |
state = @atomicLoad(usize, &self.state, .Monotonic); | |
continue; | |
} | |
if (!has_event) { | |
has_event = true; | |
waiter.event.init(); | |
} | |
waiter.next = head; | |
waiter.prev = null; | |
waiter.tail = if (head == null) &waiter else null; | |
if (@cmpxchgWeak( | |
usize, | |
&self.state, | |
state, | |
(state & ~WAITING) | @ptrToInt(&waiter), | |
.Release, | |
.Monotonic, | |
)) |updated| { | |
spinLoopHint(); | |
state = updated; | |
continue; | |
} | |
spin = 5; | |
waiter.event.wait(null) catch unreachable; | |
waiter.event.reset(); | |
state = @atomicLoad(usize, &self.state, .Monotonic); | |
} | |
if (has_event) { | |
waiter.event.deinit(); | |
} | |
} | |
pub fn release(self: *PosixLock) void { | |
const state = @atomicRmw(usize, &self.state, .Sub, LOCKED, .Release); | |
if (state >= WAITING) { | |
self.releaseSlow(); | |
} | |
} | |
fn releaseSlow(self: *PosixLock) void { | |
var state = @atomicLoad(usize, &self.state, .Monotonic); | |
while (true) { | |
if ((state < WAITING) or (state & (LOCKED | WAKING) != 0)) { | |
return; | |
} | |
state = @cmpxchgWeak( | |
usize, | |
&self.state, | |
state, | |
state | WAKING, | |
.Acquire, | |
.Monotonic, | |
) orelse break; | |
} | |
state |= WAKING; | |
dequeue: while (true) { | |
const head = @intToPtr(*Waiter, state & WAITING); | |
const tail = head.tail orelse blk: { | |
var current = head; | |
while (true) { | |
const next = current.next orelse unreachable; | |
next.prev = current; | |
current = next; | |
if (current.tail) |tail| { | |
head.tail = tail; | |
break :blk tail; | |
} | |
} | |
}; | |
while (state & LOCKED != 0) : (spinLoopHint()) { | |
state = @cmpxchgWeak(usize, &self.state, state, state & ~WAKING, .Release, .Monotonic) orelse return; | |
if (state & LOCKED == 0) { | |
@fence(.Acquire); | |
continue :dequeue; | |
} | |
} | |
if (tail.prev) |new_tail| { | |
head.tail = new_tail; | |
_ = @atomicRmw(usize, &self.state, .And, ~WAKING, .Release); | |
} else { | |
while (true) : (spinLoopHint()) { | |
state = @cmpxchgWeak(usize, &self.state, state, state & LOCKED, .Release, .Monotonic) orelse break; | |
if (state & WAITING != 0) { | |
@fence(.Acquire); | |
continue :dequeue; | |
} | |
} | |
} | |
tail.event.set(); | |
return; | |
} | |
} | |
}; | |
}; | |
// Once | |
pub usingnamespace struct { | |
pub const Once = if (is_windows) | |
WindowsOnce | |
else if (is_darwin) | |
DarwinOnce | |
else if (is_linux) | |
LinuxOnce | |
else if (is_posix) | |
PosixOnce | |
else | |
@compileError("System unsupported"); | |
const WindowsOnce = struct { | |
once: std.os.windows.INIT_ONCE = std.os.windows.INIT_ONCE_STATIC_INIT, | |
pub fn call(self: *WindowsOnce, comptime onceFn: fn () void) void { | |
const Wrapper = struct { | |
fn function( | |
once: *std.os.windows.INIT_ONCE, | |
parameter: ?std.os.windows.PVOID, | |
context: ?std.os.windows.PVOID, | |
) std.os.windows.BOOL { | |
onceFn(); | |
return std.os.windows.TRUE; | |
} | |
}; | |
const status = std.os.windows.InitOnceExecuteOnce(&self.once, Wrapper.function, null, null); | |
if (status != std.os.windows.FALSE) { | |
const err = std.os.windows.unexpectedError(std.os.windows.kernel32.GetLastError()); | |
unreachable; | |
} | |
} | |
}; | |
const DarwinOnce = struct { | |
once: dispatch_once_t = 0, | |
const dispatch_once_t = usize; | |
const dispatch_function_t = fn (?*c_void) callconv(.C) void; | |
extern fn dispatch_once_f( | |
predicate: *dispatch_once_t, | |
context: ?*c_void, | |
function: dispatch_function_t, | |
) void; | |
pub fn call(self: *DarwinOnce, comptime onceFn: fn () void) void { | |
const Wrapper = struct { | |
fn function(_: ?*c_void) callconv(.C) void { | |
return onceFn(); | |
} | |
}; | |
dispatch_once_f(&self.once, null, Wrapper.function); | |
} | |
}; | |
const LinuxOnce = struct { | |
state: State = .uninit, | |
const State = enum(i32) { | |
uninit, | |
calling, | |
waiting, | |
init, | |
}; | |
pub fn call(self: *DarwinOnce, comptime onceFn: fn () void) void { | |
const state = @atomicLoad(State, &self.state, .Acquire); | |
if (state != .init) { | |
self.callSlow(state, onceFn); | |
} | |
} | |
fn callSlow(self: *PosixOnce, current_state: State, comptime onceFn: fn() void) void { | |
@setCold(true); | |
var spin: u8 = 10; | |
var state = current_state; | |
while (true) { | |
if (state == .init) { | |
return; | |
} | |
if (state == .uninit) { | |
state = @cmpxchgWeak(State, &self.state, .uninit, .calling, .Acquire, .Acquire) orelse break; | |
spinLoopHint(); | |
continue; | |
} | |
if (state == .calling) { | |
if (spin > 0) { | |
spin -= 1; | |
spinLoopHint(); | |
state = @atomicLoad(State, &self.state, .Acquire); | |
continue; | |
} | |
if (@cmpxchgWeak(State, &self.state, .calling, .waiting, .Acquire, .Acquire)) |updated| { | |
spinLoopHint(); | |
state = updated; | |
continue; | |
} | |
} | |
switch (std.os.linux.getErrno(std.os.linux.futex_wait( | |
@ptrCast(*const i32, &self.state), | |
std.os.linux.FUTEX_PRIVATE_FLAG | std.os.linux.FUTEX_WAIT, | |
@enumToInt(State.waiting), | |
null, | |
))) { | |
0 => {}, | |
std.os.EINTR => {}, | |
std.os.EAGAIN => {}, | |
std.os.ETIMEDOUT => unreachable, | |
else => unreachable, | |
} | |
state = @atomicLoad(State, &self.state, .Acquire); | |
} | |
onceFn(); | |
state = @atomicRmw(State, &self.state, .Xchg, .init, .Release); | |
std.debug.assert(state == .calling or state == .waiting); | |
if (state == .waiting) { | |
switch (std.os.linux.getErrno(std.os.linux.futex_wake( | |
@ptrCast(*const i32, &self.state), | |
std.os.linux.FUTEX_PRIVATE_FLAG | std.os.linux.FUTEX_WAKE, | |
std.math.maxInt(i32), | |
))) { | |
0 => {}, | |
std.os.EINVAL => {}, | |
std.os.EACCES => unreachable, | |
std.os.EFAULT => unreachable, | |
else => unreachable, | |
} | |
} | |
} | |
}; | |
const PosixOnce = struct { | |
state: usize = UNINIT, | |
const UNINIT = 0; | |
const INIT = 1; | |
const Waiter = struct { | |
state: usize align(std.math.max(2, @alignOf(usize))), | |
event: Event, | |
}; | |
pub fn call(self: *PosixOnce, comptime onceFn: fn () void) void { | |
const state = @atomicLoad(usize, &self.state, .Acquire); | |
if (state != INIT) { | |
self.callSlow(state, onceFn); | |
} | |
} | |
fn callSlow(self: *PosixOnce, current_state: usize, comptime onceFn: fn() void) void { | |
@setCold(true); | |
var spin: u8 = 10; | |
var state = current_state; | |
var waiter: Waiter = undefined; | |
var has_event = false; | |
defer if (has_event) { | |
waiter.event.deinit(); | |
}; | |
while (true) { | |
if (state == INIT) { | |
return; | |
} | |
if (state != UNINIT) { | |
if (spin > 0) { | |
spin -= 1; | |
spinLoopHint(); | |
state = @atomicLoad(usize, &self.state, .Acquire); | |
continue; | |
} | |
if (!has_event) { | |
has_event = true; | |
waiter.event.init(); | |
} | |
} | |
waiter.state = state; | |
state = @cmpxchgWeak( | |
usize, | |
&self.state, | |
state, | |
@ptrToInt(&waiter), | |
.AcqRel, | |
.Acquire, | |
) orelse break; | |
} | |
if (state != UNINIT) { | |
waiter.event.wait(null) catch unreachable; | |
return; | |
} | |
onceFn(); | |
state = @atomicRmw(usize, &self.state, .Xchg, INIT, .AcqRel); | |
std.debug.assert(state != UNINIT and state != INIT); | |
while (@intToPtr(?*Waiter, state & ~@as(usize, 1))) |pending_waiter| { | |
if (pending_waiter == &waiter) break; | |
state = pending_waiter.state; | |
pending_waiter.event.set(); | |
} | |
} | |
}; | |
}; | |
// Thread | |
pub usingnamespace struct { | |
pub const Thread = if (is_windows) | |
WindowsThread | |
else if (is_posix) | |
PosixThread | |
else if (is_linux) | |
LinuxThread | |
else | |
@compileError("System unsupported"); | |
pub const ThreadConfig = struct { | |
stack_size: usize, | |
}; | |
const WindowsThread = struct { | |
handle: std.os.windows.HANDLE, | |
pub fn spawn(config: ThreadConfig, comptime entryFn: fn(usize) void, parameter: usize) !WindowsThread { | |
const Wrapper = struct { | |
fn entry(raw_arg: std.os.windows.LPVOID) callconv(.C) std.os.windows.DWORD { | |
entryFn(@ptrToInt(raw_arg)); | |
return 0; | |
} | |
}; | |
const handle = std.os.windows.kernel32.CreateThread( | |
null, | |
std.math.max(64 * 1024, config.stack_size), | |
Wrapper.entry, | |
@intToPtr(?std.os.windows.LPVOID, parameter), | |
0, | |
null, | |
) orelse return error.SpawnError; | |
return WindowsThread{ .handle = handle }; | |
} | |
pub fn join(self: WindowsThread) void { | |
std.os.windows.WaitForSingleObjectEx(self.handle, std.os.windows.INFINITE, false) catch unreachable; | |
std.os.windows.CloseHandle(self.handle); | |
} | |
}; | |
const PosixThread = struct { | |
handle: std.c.pthread_t, | |
pub fn spawn(config: ThreadConfig, comptime entryFn: fn(usize) void, parameter: usize) !PosixThread { | |
const Wrapper = struct { | |
fn entry(raw_arg: ?*c_void) callconv(.C) ?*c_void { | |
entryFn(@ptrToInt(raw_arg)); | |
return null; | |
} | |
}; | |
var attr: std.c.pthread_attr_t = undefined; | |
if (std.c.pthread_attr_init(&attr) != 0) return error.SystemResources; | |
defer std.debug.assert(std.c.pthread_attr_destroy(&attr) == 0); | |
const stack_size = std.math.max(16 * 1024, config.stack_size); | |
if (std.c.pthread_attr_setstacksize(&attr, stack_size) != 0) { | |
return error.SystemResources; | |
} | |
var handle: std.c.pthread_t = undefined; | |
const rc = std.c.pthread_create( | |
&handle, | |
&attr, | |
Wrapper.entry, | |
@intToPtr(?*c_void, parameter), | |
); | |
return switch (rc) { | |
0 => PosixThread{ .handle = handle }, | |
else => error.SpawnError, | |
}; | |
} | |
pub fn join(self: PosixThread) void { | |
const rc = std.c.pthread_join(self.handle, null); | |
std.debug.assert(rc == 0); | |
} | |
}; | |
const LinuxThread = struct { | |
info: *Info, | |
const Info = struct { | |
mmap_ptr: usize, | |
mmap_len: usize, | |
parameter: usize, | |
handle: i32, | |
}; | |
pub fn spawn(config: ThreadConfig, comptime entryFn: fn(usize) void, parameter: usize) !LinuxThread { | |
var mmap_size: usize = std.mem.page_size; | |
const guard_end = mmap_size; | |
mmap_size = std.mem.alignForward(mmap_size + config.stack_size, std.mem.page_size); | |
const stack_end = mmap_size; | |
mmap_size = std.mem.alignForward(mmap_size, @alignOf(Info)); | |
const info_begin = mmap_size; | |
mmap_size = std.mem.alignForward(mmap_size + @sizeOf(Info), std.os.linux.tls.tls_image.alloc_align); | |
const tls_begin = mmap_size; | |
mmap_size = std.mem.alignForward(mmap_size + std.os.linux.tls.tls_image.alloc_size, std.mem.page_size); | |
const mmap_bytes = try std.os.mmap( | |
null, | |
mmap_size, | |
std.os.PROT_NONE, | |
std.os.MAP_PRIVATE | std.os.MAP_ANONYMOUS, | |
-1, | |
0, | |
); | |
errdefer std.os.munmap(mmap_bytes); | |
try std.os.mprotect( | |
mmap_bytes[guard_end..], | |
std.os.PROT_READ | std.os.PROT_WRITE, | |
); | |
const info = @ptrCast(*Info, @alignCast(@alignOf(Info), &mmap_bytes[info_begin])); | |
info.* = .{ | |
.mmap_ptr = @ptrToInt(mmap_bytes.ptr), | |
.mmap_len = mmap_bytes.len, | |
.parameter = parameter, | |
.handle = undefined, | |
}; | |
var user_desc: switch (std.builtin.arch) { | |
.i386 => std.os.linux.user_desc, | |
else => void, | |
} = undefined; | |
var tls_ptr = std.os.linux.tls.prepareTLS(mmap_bytes[tls_begin..]); | |
if (std.builtin.arch == .i386) { | |
defer tls_ptr = @ptrToInt(&user_desc); | |
user_desc = .{ | |
.entry_number = std.os.linux.tls.tls_image.gdt_entry_number, | |
.base_addr = tls_ptr, | |
.limit = 0xfffff, | |
.seg_32bit = 1, | |
.contents = 0, | |
.read_exec_only = 0, | |
.limit_in_pages = 1, | |
.seg_not_present = 0, | |
.useable = 1, | |
}; | |
} | |
const flags: u32 = | |
std.os.CLONE_SIGHAND | std.os.CLONE_SYSVSEM | | |
std.os.CLONE_VM | std.os.CLONE_FS | std.os.CLONE_FILES | | |
std.os.CLONE_PARENT_SETTID | std.os.CLONE_CHILD_CLEARTID | | |
std.os.CLONE_THREAD | std.os.CLONE_DETACHED | std.os.CLONE_SETTLS; | |
const Wrapper = struct { | |
fn entry(raw_arg: usize) callconv(.C) u8 { | |
const info_ptr = @intToPtr(*Info, raw_arg); | |
entryFn(info_ptr.parameter); | |
return 0; | |
} | |
}; | |
const rc = std.os.linux.clone( | |
Wrapper.entry, | |
@ptrToInt(&mmap_bytes[stack_end]), | |
flags, | |
@ptrToInt(info), | |
&info.handle, | |
tls_ptr, | |
&info.handle, | |
); | |
return switch (std.os.linux.getErrno(rc)) { | |
0 => LinuxThread{ .info = info }, | |
else => error.SpawnError, | |
}; | |
} | |
pub fn join(self: LinuxThread) void { | |
while (true) { | |
const tid = @atomicLoad(i32, &self.info.handle, .SeqCst); | |
if (tid == 0) { | |
std.os.munmap(@intToPtr([*]align(std.mem.page_size) u8, self.info.mmap_ptr)[0..self.info.mmap_len]); | |
return; | |
} | |
const rc = std.os.linux.futex_wait(&self.info.handle, std.os.linux.FUTEX_WAIT, tid, null); | |
switch (std.os.linux.getErrno(rc)) { | |
0 => continue, | |
std.os.EINTR => continue, | |
std.os.EAGAIN => continue, | |
else => unreachable, | |
} | |
} | |
} | |
}; | |
}; | |
// Event | |
pub usingnamespace struct { | |
pub const Event = if (is_windows) | |
WindowsEvent | |
else if (is_darwin) | |
DarwinEvent | |
else if (is_linux) | |
LinuxEvent | |
else if (is_posix) | |
PosixEvent | |
else | |
@compileError("System unsupported"); | |
const WindowsEvent = struct { | |
pub fn init(self: *WindowsEvent) void { | |
@compileError("TODO"); | |
} | |
pub fn deinit(self: *WindowsEvent) void { | |
@compileError("TODO"); | |
} | |
pub fn wait(self: *WindowsEvent, timeout: ?u64) void { | |
@compileError("TODO"); | |
} | |
pub fn set(self: *WindowsEvent) void { | |
@compileError("TODO"); | |
} | |
pub fn reset(self: *WindowsEvent) void { | |
@compileError("TODO"); | |
} | |
}; | |
const LinuxEvent = struct { | |
pub fn init(self: *LinuxEvent) void { | |
@compileError("TODO"); | |
} | |
pub fn deinit(self: *LinuxEvent) void { | |
@compileError("TODO"); | |
} | |
pub fn wait(self: *LinuxEvent, timeout: ?u64) void { | |
@compileError("TODO"); | |
} | |
pub fn set(self: *LinuxEvent) void { | |
@compileError("TODO"); | |
} | |
pub fn reset(self: *LinuxEvent) void { | |
@compileError("TODO"); | |
} | |
}; | |
const DarwinEvent = struct { | |
// pthread_key -> union(enum){ dispatch_semaphore_t, PosixResetEvent } | |
pub fn init(self: *DarwinEvent) void { | |
@compileError("TODO"); | |
} | |
pub fn deinit(self: *DarwinEvent) void { | |
@compileError("TODO"); | |
} | |
pub fn wait(self: *DarwinEvent, timeout: ?u64) void { | |
@compileError("TODO"); | |
} | |
pub fn set(self: *DarwinEvent) void { | |
@compileError("TODO"); | |
} | |
pub fn reset(self: *DarwinEvent) void { | |
@compileError("TODO"); | |
} | |
}; | |
const PosixEvent = struct { | |
pub fn init(self: *PosixEvent) void { | |
} | |
pub fn deinit(self: *PosixEvent) void { | |
} | |
pub fn wait(self: *PosixEvent, timeout: ?u64) void { | |
} | |
pub fn set(self: *PosixEvent) void { | |
} | |
pub fn reset(self: *PosixEvent) void { | |
} | |
}; | |
}; | |
}; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const Node = struct { | |
next: ?*Node, | |
pub const Batch = struct { | |
}; | |
pub const GlobalQueue = struct { | |
stack: usize = 0, | |
const IS_POPPING: usize = 1 << 0; | |
pub fn push(self: *GlobalQueue, batch: Batch) void { | |
if (batch.len == 0) { | |
return; | |
} | |
var stack = @atomicLoad(usize, &self.stack, .Monotonic); | |
while (true) : (std.Thread.spinLoopHint()) { | |
batch.tail.next = @intToPtr(?*Node, stack & ~IS_POPPING); | |
stack = @cmpxchgWeak( | |
?*Node, | |
&self.stack, | |
stack, | |
@ptrToInt(batch.head) | (stack & IS_POPPING), | |
.Release, | |
.Monotonic, | |
) orelse return; | |
} | |
} | |
}; | |
/// https://fzn.fr/readings/ppopp13.pdf | |
pub const LocalQueue = struct { | |
head: Index = 0, | |
tail: Index = 0, | |
overflow: ?*Node = null, | |
buffer: [capacity]*Node = undefined, | |
const Index = u8; | |
const capacity = 128; | |
comptime { | |
std.debug.assert(std.math.maxInt(Index) >= capacity); | |
} | |
const is_x86 = switch (std.builtin.arch) { | |
.i386, .x86_64 => true, | |
else => false, | |
}; | |
pub fn push(self: *LocalQueue, _batch: Batch) void { | |
var batch = _batch; | |
if (batch.len == 0) { | |
return; | |
} | |
push_buffer: { | |
if (batch.len >= self.buffer.len) { | |
break :push_buffer; | |
} | |
var tail = self.tail; | |
var head = @atomicLoad(Index, &self.head, .Monotonic); | |
while (true) : (std.Thread.spinLoopHint()) { | |
const occupied = tail -% head; | |
if (batch.len <= (self.buffer.len - occupied)) { | |
while (batch.pop()) |node| { | |
@atomicStore(*Node, &self.buffer[tail % self.buffer.len], node, .Unordered); | |
tail +%= 1; | |
} | |
@atomicStore(Index, &self.tail, tail, .Release); | |
return; | |
} | |
if (occupied <= (self.buffer.len / 2)) { | |
break :push_buffer; | |
} | |
const new_head = head +% (occupied / 2); | |
if (@cmpxchgWeak(Index, &self.head, head, new_head, .Acquire, .Monotonic)) |updated| { | |
head = updated; | |
continue; | |
} | |
while (head != new_head) : (head +%= 1) { | |
const node = self.buffer[head % self.buffer.len]; | |
batch.push(.{ .lifo = Batch.from(node) }); | |
} | |
break :push_buffer; | |
} | |
} | |
var overflow = @atomicLoad(?*Node, &self.overflow, .Monotonic); | |
while (true) : (std.Thread.spinLoopHint()) { | |
batch.tail.next = overflow; | |
if (overflow == null) { | |
@atomicStore(?*Node, &self.overflow, batch.head, .Release); | |
return; | |
} | |
overflow = @cmpxchgWeak( | |
?*Node, | |
&self.overflow, | |
overflow, | |
batch.head, | |
.Release, | |
.Monotonic, | |
) orelse return; | |
} | |
} | |
pub const Pop = struct { | |
node: *Node, | |
pushed: bool = false, | |
}; | |
pub fn pop(self: *LocalQueue, target: anytype) ?Pop { | |
switch (@TypeOf(target)) { | |
*LocalQueue => { | |
if (self == target) { | |
return self.popLocal(); | |
} else { | |
return self.popAndStealLocal(target); | |
} | |
}, | |
*GlobalQueue => return self.popAndStealGlobal(target), | |
else => |t| @compileError("Cannot pop() from " ++ @typeName(t)), | |
} | |
} | |
fn popLocal(self: *LocalQueue) ?Pop { | |
var tail = self.tail; | |
var head = @atomicLoad(Index, &self.head, .Monotonic); | |
while (head != tail) : (std.Thread.spinLoopHint()) { | |
if (is_x86) { | |
const new_tail = tail -% 1; | |
_ = @atomicRmw(Index, &self.tail, new_tail, .SeqCst); | |
head = @atomicLoad(Index, &self.head, .Monotonic); | |
if (head != tail) { | |
const node = self.buffer[new_tail % self.buffer.len]; | |
if (head != new_tail) { | |
return Pop{ .node = node }; | |
} | |
if (@cmpxchgStrong(Index, &self.head, head, tail, .SeqCst, .Monotonic) == null) { | |
return Pop{ .node = node }; | |
} | |
} | |
@atomicStore(Index, &self.tail, tail, .Monotonic); | |
break; | |
} else { | |
head = @cmpxchgWeak(Index, &self.head, head, head +% 1, .Monotonic, .Monotonic) orelse { | |
const node = self.buffer[head % self.buffer.len]; | |
return Pop{ .node = node }; | |
}; | |
} | |
} | |
var overflow = @atomicLoad(?*Node, &self.overflow, .Monotonic); | |
if (overflow != null) { | |
overflow = switch (is_x86) { | |
true => @atomicRmw(?*Node, &self.overflow, .Xchg, null, .Monotonic), | |
else => @cmpxchgStrong(?*Node, &self.overflow, overflow, null, .Monotonic, .Monotonic) orelse overflow, | |
}; | |
} | |
const node = overflow orelse return null; | |
overflow = node.next orelse return Pop{ .node = node }; | |
var slots = self.buffer.len - (tail -% head); | |
while (slots > 0) : (slots += 1) { | |
const overflow_node = overflow orelse break; | |
overflow = overflow_node.next; | |
@atomicStore(*Node, &self.buffer[tail % self.buffer.len], overflow_node, .Unordered); | |
tail +%= 1; | |
} | |
@atomicStore(Index, &self.tail, tail, .Release); | |
if (overflow != null) { | |
@atomicStore(?*Node, &self.overflow, overflow, .Release); | |
} | |
return Pop{ | |
.node = node, | |
.pushed = true, | |
}; | |
} | |
fn popAndStealLocal(noalias self: *LocalQueue, noalias target: *LocalQueue) ?Pop { | |
@setCold(true); | |
const tail = self.tail; | |
var target_head = @atomicLoad(Index, &target.head, .Acquire); | |
while (true) { | |
const target_tail = @atomicLoad(Index, &target.tail, .Acquire); | |
const target_size = target_tail -% target_head; | |
if (target_size == 0 or target_size == std.math.maxInt(Index)) { | |
var node = @atomicLoad(?*Node, &target.overflow, .Monotonic) orelse return null; | |
node = switch (is_x86) { | |
true => @atomicRmw(?*Node, &target.overflow, .Xchg, null, .Acquire) orelse return null, | |
else => blk: { | |
_ = @cmpxchgWeak(?*Node, &target.overflow, node, null, .Acquire, .Monotonic) orelse break :blk node; | |
std.Thread.spinLoopHint(); | |
target_head = @atomicLoad(Index, &target.head, .Acquire); | |
continue; | |
}, | |
}; | |
var overflow: ?*Node = node.next orelse return Pop{ .node = node }; | |
var new_tail = tail; | |
const head = @atomicLoad(Index, &self.head, .Monotonic); | |
while (new_tail -% head < self.buffer.len) { | |
const overflow_node = overflow orelse break; | |
overflow = overflow_node.next; | |
@atomicStore(*Node, &self.buffer[new_tail % self.buffer.len], overflow_node, .Unordered); | |
new_tail +%= 1; | |
} | |
@atomicStore(Index, &self.tail, new_tail, .Release); | |
if (overflow != null) { | |
@atomicStore(?*Node, &self.overflow, overflow, .Release); | |
} | |
return Pop{ | |
.node = node, | |
.pushed = true, | |
}; | |
} | |
const head = @atomicLoad(Index, &self.head, .Unordered); | |
const unonccupied = self.buffer.len - (tail -% head); | |
const target_steal = std.math.min( | |
unonccupied + 1, | |
target_size - (target_size / 2), | |
); | |
std.debug.assert(target_steal >= 1); | |
if (target_steal > (target.buffer.len / 2)) { | |
std.Thread.spinLoopHint(); | |
target_head = @atomicLoad(Index, &target.head, .Acquire); | |
continue; | |
} | |
const node = @atomicLoad(*Node, &target.buffer[target_head % target.buffer.len], .Unordered); | |
if (target_steal > 1) { | |
var buffer_steal: Index = 1; | |
while (buffer_steal < target_steal) : (buffer_steal += 1) { | |
const store_index = (tail +% buffer_steal) % self.buffer.len; | |
const steal_index = (target_head +% buffer_steal) % target.buffer.len; | |
const buffer_node = @atomicLoad(*Node, &target.buffer[steal_index], .Unordered); | |
@atomicStore(*Node, &self.buffer[store_index], buffer_node, .Unordered); | |
} | |
} | |
if (@cmpxchgWeak( | |
Index, | |
&target_head, | |
target_head, | |
target_head +% target_steal, | |
.AcqRel, | |
.Acquire, | |
)) |updated| { | |
std.Thread.spinLoopHint(); | |
target_head = updated; | |
continue; | |
} | |
const new_tail = tail +% (target_steal - 1); | |
if (tail != new_tail) { | |
@atomicStore(Index, &self.tail, new_tail, .Release); | |
} | |
return Pop{ | |
.node = node, | |
.pushed = tail != new_tail, | |
}; | |
} | |
} | |
fn popAndStealGlobal(noalias self: *LocalQueue, noalias target: *GlobalQueue) ?Pop { | |
var overflow = @atomicLoad(?*Node, &self.overflow, .Monotonic); | |
var node: ?*Node = null; | |
var tail = self.tail; | |
var new_tail = tail; | |
var head = @atomicLoad(Index, &self.head, .Unordered); | |
while (true) : (std.Thread.spinLoopHint()) { | |
const occupied = new_tail -% head; | |
if (node != null and occupied == self.buffer.len) { | |
head = @atomicLoad(Index, &self.head, .Monotonic); | |
if ((new_tail -% head) == occupied) { | |
break; | |
} | |
} | |
const consumer_node = consumer.pop() orelse break; | |
if (node == null) { | |
node = consumer_node; | |
} else { | |
@atomicStore(*Node, &self.buffer[new_tail % self.buffer.len], consumer_node, .Unordered); | |
new_tail +%= 1; | |
} | |
} | |
return Pop{ | |
.node = node, | |
.pushed = new_tail != tail, | |
}; | |
} | |
}; | |
}; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const Loop = @This(); | |
counter: usize = 0, | |
stack_size: usize, | |
max_workers: usize, | |
idle_lock: Lock = .{}, | |
idle_workers: ?*Worker = null, | |
spawned_workers: ?*Worker = null, | |
run_queues: [Priority.ARRAY_SIZE]GlobalQueue = [_]GlobalQueue{.{}} ** Priority.ARRAY_SIZE, | |
pub const RunConfig = struct { | |
max_threads: ?usize = null, | |
stack_size: ?usize = null, | |
}; | |
fn ReturnTypeOf(comptime asyncFn: anytype) type { | |
return @typeInfo(@TypeOf(asyncFn)).Fn.return_type.?; | |
} | |
pub fn run(config: RunConfig, comptime asyncFn: anytype, args: anytype) !ReturnTypeOf(asyncFn) { | |
const Args = @TypeOf(args); | |
const Result = ReturnTypeOf(asyncFn); | |
const Wrapper = struct { | |
fn entry(task: *Task, result: *?Result, arguments: Args) void { | |
suspend task.* = Task.init(@frame()); | |
const res = @call(.{}, asyncFn, arguments); | |
result.* = res; // TODO: check if this hack is still needed | |
suspend Worker.getCurrent().?.loop.shutdown(); | |
} | |
}; | |
var task: Task = undefined; | |
var result: ?Result = null; | |
var frame = async Wrapper.entry(&task, &result, args); | |
runTask(config, &task); | |
return result orelse error.AsyncFnDidNotComplete; | |
} | |
pub fn runTask(config: RunConfig, task: *Task) void { | |
if (std.builtin.single_threaded) | |
@compileError("TODO: specialize for single_threaded"); | |
const stack_size = blk: { | |
const stack_size = config.stack_size orelse 16 * 1024 * 1024; | |
break :blk std.math.max(std.mem.page_size, stack_size); | |
}; | |
const max_workers = blk: { | |
const max_threads = config.max_threads orelse std.Thread.cpuCount() catch 1; | |
break :blk std.math.max(1, max_threads); | |
}; | |
var loop = Loop{ | |
.max_workers = max_workers, | |
.stack_size = stack_size, | |
}; | |
defer loop.idle_lock.deinit(); | |
loop.run_queues[0].push(Batch.from(task)); | |
loop.counter = (Counter{ | |
.state = .waking, | |
.notified = false, | |
.idle = 0, | |
.spawned = 1, | |
}).pack(); | |
Worker.run(&loop, null); | |
} | |
pub const Task = struct { | |
next: ?*Task = undefined, | |
data: usize, | |
pub fn init(frame: anyframe) Task { | |
return .{ .data = @ptrToInt(frame) | 0 }; | |
} | |
pub const Callback = fn(*Task) callconv(.C) void; | |
pub fn initCallback(callback: Callback) void { | |
return .{ .data = @ptrToInt(callback) | 1 }; | |
} | |
fn run(self: *Task) void { | |
switch (self.data & 1) { | |
0 => resume @intToPtr(anyframe, self.data), | |
1 => @intToPtr(Callback, self.data & ~@as(usize, 1))(self), | |
else => unreachable, | |
} | |
} | |
}; | |
pub const Priority = enum(u2) { | |
Low = 0, | |
Normal = 1, | |
High = 2, | |
Handoff = 3, | |
const ARRAY_SIZE = 3; | |
fn toArrayIndex(self: Priority) usize { | |
return switch (self) { | |
.Handoff, .High => 2, | |
.Normal => 1, | |
.Low => 0, | |
}; | |
} | |
}; | |
pub fn scheduleRemote(self: *Loop, task: *Task, priority: Priority) void { | |
self.run_queues[priority.toArrayIndex()].push(Batch.from(task)); | |
self.notifyWorkersWith(null); | |
} | |
pub fn schedule(task: *Task, priority: Priority) void { | |
const worker = Worker.getCurrent() orelse @panic("Loop.schedule called outside of worker thread pool"); | |
worker.schedule(Batch.from(task), priority); | |
} | |
const Counter = struct { | |
state: State = .pending, | |
notified: bool = false, | |
idle: usize = 0, | |
spawned: usize = 0, | |
const count_bits = @divFloor(std.meta.bitCount(usize) - 3, 2); | |
const Count = std.meta.Int(.unsigned, count_bits); | |
const State = enum(u2) { | |
pending = 0, | |
waking, | |
signaled, | |
shutdown, | |
}; | |
fn pack(self: Counter) usize { | |
return ( | |
(@as(usize, @enumToInt(self.state)) << 0) | | |
(@as(usize, @boolToInt(self.notified)) << 2) | | |
(@as(usize, @intCast(Count, self.idle)) << 3) | | |
(@as(usize, @intCast(Count, self.spawned)) << (3 + count_bits)) | |
); | |
} | |
fn unpack(value: usize) Counter { | |
return .{ | |
.state = @intToEnum(State, @truncate(u2, value)), | |
.notified = value & (1 << 2) != 0, | |
.idle = @truncate(Count, value >> 3), | |
.spawned = @truncate(Count, value >> (3 + count_bits)), | |
}; | |
} | |
}; | |
fn getWorkerCount(self: *const Loop) usize { | |
const counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic)); | |
return counter.spawned; | |
} | |
fn getWorkerIter(self: *const Loop) WorkerIter { | |
const spawned_workers = @atomicLoad(?*Worker, &self.spawned_workers, .Acquire); | |
return .{ .spawned = spawned_workers }; | |
} | |
const WorkerIter = struct { | |
spawned: ?*Worker = null, | |
fn next(self: *WorkerIter) ?*Worker { | |
const worker = self.spawned orelse return null; | |
self.spawned = worker.spawned_next; | |
return worker; | |
} | |
}; | |
fn beginWorkerWith(self: *Loop, worker: *Worker) void { | |
var spawned_workers = @atomicLoad(?*Worker, &self.spawned_workers, .Monotonic); | |
while (true) { | |
worker.spawned_next = spawned_workers; | |
spawned_workers = @cmpxchgWeak( | |
?*Worker, | |
&self.spawned_workers, | |
spawned_workers, | |
worker, | |
.Release, | |
.Monotonic, | |
) orelse break; | |
} | |
} | |
fn notifyWorkersWith(self: *Loop, worker: ?*Worker) void { | |
var did_spawn = false; | |
var spawn_attempts_remaining: u8 = 5; | |
var is_waking = if (worker) |w| w.state == .waking else false; | |
var counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic)); | |
while (true) { | |
if (counter.state == .shutdown) { | |
if (did_spawn) | |
self.markShutdown(); | |
return; | |
} | |
var is_spawning = false; | |
var is_signaling = false; | |
var new_counter = counter; | |
if (is_waking) { | |
std.debug.assert(counter.state == .waking); | |
if (counter.idle > 0) { | |
is_signaling = true; | |
new_counter.state = .signaled; | |
if (did_spawn) | |
new_counter.spawned -= 1; | |
} else if (spawn_attempts_remaining > 0 and (did_spawn or counter.spawned < self.max_workers)) { | |
is_spawning = true; | |
if (!did_spawn) | |
new_counter.spawned += 1; | |
} else { | |
new_counter.notified = true; | |
new_counter.state = .pending; | |
if (did_spawn) | |
new_counter.spawned -= 1; | |
} | |
} else { | |
if (counter.state == .pending and counter.idle > 0) { | |
is_signaling = true; | |
new_counter.state = .signaled; | |
} else if (counter.state == .pending and counter.spawned < self.max_workers) { | |
is_spawning = true; | |
new_counter.state = .waking; | |
new_counter.spawned += 1; | |
} else if (!counter.notified) { | |
new_counter.notified = true; | |
} else { | |
return; | |
} | |
} | |
if (@cmpxchgWeak( | |
usize, | |
&self.counter, | |
counter.pack(), | |
new_counter.pack(), | |
.Release, | |
.Monotonic, | |
)) |updated| { | |
std.Thread.spinLoopHint(); | |
counter = Counter.unpack(updated); | |
continue; | |
} | |
is_waking = true; | |
if (is_signaling) { | |
self.notifyOne(); | |
return; | |
} | |
if (is_spawning) { | |
did_spawn = true; | |
if (Worker.spawn(self)) | |
return; | |
} else { | |
return; | |
} | |
std.Thread.spinLoopHint(); | |
spawn_attempts_remaining -= 1; | |
counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic)); | |
} | |
} | |
fn suspendWorkerWith(self: *Loop, worker: *Worker) void { | |
var counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic)); | |
while (true) { | |
if (counter.state == .shutdown) { | |
worker.state = .shutdown; | |
self.markShutdown(); | |
return; | |
} | |
if (counter.notified or counter.state == .signaled or worker.state != .waiting) { | |
var new_counter = counter; | |
new_counter.notified = false; | |
if (counter.state == .signaled) { | |
new_counter.state = .waking; | |
if (worker.state == .waiting) | |
new_counter.idle -= 1; | |
} else if (counter.notified) { | |
if (worker.state == .waking) | |
new_counter.state = .waking; | |
if (worker.state == .waiting) | |
new_counter.idle -= 1; | |
} else { | |
if (worker.state == .waking) | |
new_counter.state = .pending; | |
if (worker.state != .waiting) | |
new_counter.idle += 1; | |
} | |
if (@cmpxchgWeak( | |
usize, | |
&self.counter, | |
counter.pack(), | |
new_counter.pack(), | |
.Acquire, | |
.Monotonic, | |
)) |updated| { | |
std.Thread.spinLoopHint(); | |
counter = Counter.unpack(updated); | |
continue; | |
} | |
if (counter.notified or counter.state == .signaled) { | |
if (counter.state == .signaled) { | |
worker.state = .waking; | |
} else if (worker.state == .waiting) { | |
worker.state = .running; | |
} | |
return; | |
} | |
} | |
worker.state = .waiting; | |
self.wait(worker); | |
counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic)); | |
} | |
} | |
fn shutdown(self: *Loop) void { | |
var counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic)); | |
while (counter.state != .shutdown) { | |
var new_counter = counter; | |
new_counter.state = .shutdown; | |
new_counter.idle = 0; | |
if (@cmpxchgWeak( | |
usize, | |
&self.counter, | |
counter.pack(), | |
new_counter.pack(), | |
.AcqRel, | |
.Monotonic, | |
)) |updated| { | |
std.Thread.spinLoopHint(); | |
counter = Counter.unpack(updated); | |
continue; | |
} | |
if (counter.idle > 0) | |
self.notifyAll(); | |
return; | |
} | |
} | |
fn markShutdown(self: *Loop) void { | |
var counter = Counter{ .spawned = 1 }; | |
counter = Counter.unpack(@atomicRmw(usize, &self.counter, .Sub, counter.pack(), .AcqRel)); | |
std.debug.assert(counter.state == .shutdown); | |
if (counter.spawned == 1 and counter.idle != 0) { | |
self.notifyOne(); | |
} | |
} | |
fn joinWith(self: *Loop, worker: *Worker) void { | |
var counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Acquire)); | |
while (true) { | |
if (counter.spawned == 0) | |
break; | |
if (counter.idle == 0) { | |
var new_counter = counter; | |
new_counter.idle = 1; | |
if (@cmpxchgWeak( | |
usize, | |
&self.counter, | |
counter.pack(), | |
new_counter.pack(), | |
.Acquire, | |
.Acquire, | |
)) |updated| { | |
std.Thread.spinLoopHint(); | |
counter = Counter.unpack(updated); | |
continue; | |
} | |
} | |
self.wait(worker); | |
counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Acquire)); | |
} | |
var pending_workers = self.getWorkerIter(); | |
while (pending_workers.next()) |pending_worker| { | |
const thread = pending_worker.thread orelse continue; | |
pending_worker.notify(); | |
thread.join(); | |
} | |
} | |
fn wait(self: *Loop, worker: *Worker) void { | |
const is_waiting = blk: { | |
const held = self.idle_lock.acquire(); | |
defer held.release(); | |
const counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic)); | |
const should_wake = switch (worker.state) { | |
.shutdown => counter.spawned == 0, | |
.waiting => counter.notified or counter.state == .signaled or counter.state == .shutdown, | |
else => unreachable, | |
}; | |
if (should_wake) { | |
break :blk false; | |
} | |
worker.idle_next = self.idle_workers; | |
self.idle_workers = worker; | |
break :blk true; | |
}; | |
if (is_waiting) { | |
worker.wait(); | |
} | |
} | |
fn notifyOne(self: *Loop) void { | |
self.wake(false); | |
} | |
fn notifyAll(self: *Loop) void { | |
self.wake(true); | |
} | |
fn wake(self: *Loop, wake_all: bool) void { | |
const worker = blk: { | |
const held = self.idle_lock.acquire(); | |
defer held.release(); | |
const worker = self.idle_workers orelse { | |
return; | |
}; | |
if (wake_all) { | |
self.idle_workers = null; | |
} else { | |
self.idle_workers = worker.idle_next; | |
worker.idle_next = null; | |
} | |
break :blk worker; | |
}; | |
var idle_workers: ?*Worker = worker; | |
while (idle_workers) |idle_worker| { | |
idle_workers = idle_worker.idle_next; | |
idle_worker.notify(); | |
} | |
} | |
const Worker = struct { | |
loop: *Loop, | |
thread: ?Thread, | |
state: State = .waking, | |
wait_state: usize = 0, | |
idle_next: ?*Worker = null, | |
spawned_next: ?*Worker = null, | |
steal_targets: WorkerIter = .{}, | |
run_next: Batch = .{}, | |
run_queues: [Priority.ARRAY_SIZE]LocalQueue = [_]LocalQueue{.{}} ** Priority.ARRAY_SIZE, | |
const ThreadLocalWorkerPtr = ThreadLocalUsize(struct{}); | |
const State = enum { | |
running, | |
waking, | |
waiting, | |
shutdown, | |
}; | |
fn spawn(loop: *Loop) bool { | |
const Spawner = struct { | |
_loop: *Loop = undefined, | |
_thread: Thread = undefined, | |
put_event: Event = .{}, | |
got_event: Event = .{}, | |
fn entry(self: *@This()) void { | |
std.debug.assert(self.put_event.wait(null)); | |
const _loop = self._loop; | |
const _thread = self._thread; | |
self.got_event.set(); | |
Worker.run(_loop, _thread); | |
} | |
}; | |
var spawner = Spawner{}; | |
defer { | |
spawner.put_event.deinit(); | |
spawner.got_event.deinit(); | |
} | |
spawner._loop = loop; | |
spawner._thread = Thread.spawn(loop.stack_size, &spawner, Spawner.entry) catch return false; | |
spawner.put_event.set(); | |
std.debug.assert(spawner.got_event.wait(null)); | |
return true; | |
} | |
fn run(loop: *Loop, thread: ?Thread) void { | |
var self = Worker{ | |
.loop = loop, | |
.thread = thread, | |
}; | |
loop.beginWorkerWith(&self); | |
ThreadLocalWorkerPtr.set(@ptrToInt(&self)); | |
var tick = @truncate(u8, @ptrToInt(&self) >> @sizeOf(*Worker)); | |
while (self.state != .shutdown) { | |
tick +%= 1; | |
if (self.poll(tick)) |task| { | |
if (self.state == .waking) | |
loop.notifyWorkersWith(&self); | |
self.state = .running; | |
task.run(); | |
continue; | |
} | |
loop.suspendWorkerWith(&self); | |
} | |
if (thread == null) { | |
loop.joinWith(&self); | |
} else { | |
self.wait(); | |
} | |
} | |
fn getCurrent() ?*Worker { | |
return @intToPtr(?*Worker, ThreadLocalWorkerPtr.get()); | |
} | |
const WaitState = enum(u2) { | |
Empty = 0, | |
Waiting = 1, | |
Notified = 3, | |
}; | |
fn wait(self: *Worker) void { | |
var event align(std.math.max(@alignOf(Event), 4)) = Event{}; | |
defer event.deinit(); | |
if (@cmpxchgStrong( | |
usize, | |
&self.wait_state, | |
@enumToInt(WaitState.Empty), | |
@enumToInt(WaitState.Waiting) | @ptrToInt(&event), | |
.AcqRel, | |
.Acquire, | |
)) |updated| { | |
std.debug.assert(@intToEnum(WaitState, @truncate(u2, updated)) == .Notified); | |
@atomicStore(usize, &self.wait_state, @enumToInt(WaitState.Empty), .Monotonic); | |
return; | |
} | |
const timeout: ?u64 = null; | |
const timed_out = !event.wait(timeout); | |
std.debug.assert(!timed_out); | |
} | |
fn notify(self: *Worker) void { | |
var wait_state = @atomicLoad(usize, &self.wait_state, .Monotonic); | |
while (true) { | |
var new_state: WaitState = undefined; | |
switch (@intToEnum(WaitState, @truncate(u2, wait_state))) { | |
.Empty => new_state = .Notified, | |
.Waiting => new_state = .Empty, | |
.Notified => return, | |
} | |
if (@cmpxchgWeak( | |
usize, | |
&self.wait_state, | |
wait_state, | |
@enumToInt(new_state), | |
.AcqRel, | |
.Monotonic, | |
)) |updated| { | |
std.Thread.spinLoopHint(); | |
wait_state = updated; | |
continue; | |
} | |
if (new_state == .Empty) { | |
const EventPtr = *align(std.math.max(@alignOf(Event), 4)) Event; | |
const event = @intToPtr(EventPtr, wait_state & ~@as(usize, 0b11)); | |
event.set(); | |
} | |
return; | |
} | |
} | |
fn schedule(self: *Worker, batch: Batch, priority: Priority) void { | |
if (batch.isEmpty()) | |
return; | |
if (priority == .Handoff) { | |
self.run_next.push(batch); | |
} else { | |
self.run_queues[priority.toArrayIndex()].push(batch); | |
} | |
self.loop.notifyWorkersWith(null); | |
} | |
fn poll(self: *Worker, tick: u8) ?*Task { | |
if (tick % 61 == 0) { | |
if (self.pollGlobal(null)) |task| | |
return task; | |
} | |
var steal_attempt: u8 = 0; | |
while (steal_attempt < 4) : (steal_attempt += 1) { | |
if (self.pollLocal(false)) |task| | |
return task; | |
if (self.pollGlobal(steal_attempt)) |task| | |
return task; | |
if (self.pollSteal(steal_attempt)) |task| | |
return task; | |
} | |
return null; | |
} | |
fn pollLocal(self: *Worker, be_fair: bool) ?*Task { | |
var priority_order = [_]Priority{ .Handoff, .High, .Normal, .Low }; | |
if (be_fair) { | |
priority_order = [_]Priority{ .Low, .Normal, .High, .Handoff }; | |
} | |
for (priority_order) |priority| { | |
if (switch (priority) { | |
.Handoff => self.run_next.pop(), | |
else => self.run_queues[priority.toArrayIndex()].pop(be_fair) | |
}) |task| { | |
return task; | |
} | |
} | |
return null; | |
} | |
fn pollGlobal(self: *Worker, steal_attempt: ?u8) ?*Task { | |
return self.pollQueues(&self.loop.run_queues, "popAndStealGlobal", steal_attempt); | |
} | |
fn pollSteal(self: *Worker, steal_attempt: ?u8) ?*Task { | |
var iter = self.loop.getWorkerCount(); | |
while (iter > 0) : (iter -= 1) { | |
const target = self.steal_targets.next() orelse blk: { | |
self.steal_targets = self.loop.getWorkerIter(); | |
break :blk self.steal_targets.next() orelse unreachable; | |
}; | |
if (target == self) | |
continue; | |
if (self.pollQueues(&target.run_queues, "popAndStealLocal", steal_attempt)) |task| | |
return task; | |
} | |
return null; | |
} | |
fn pollQueues( | |
self: *Worker, | |
target_queues: anytype, | |
comptime popAndStealFn: []const u8, | |
steal_attempt: ?u8, | |
) ?*Task { | |
const priority_order: []const Priority = blk: { | |
const attempt = steal_attempt orelse { | |
break :blk &[_]Priority{ .Low, .Normal, .High }; | |
}; | |
break :blk switch (attempt) { | |
0 => &[_]Priority{ .High }, | |
1 => &[_]Priority{ .High, .Normal }, | |
else => &[_]Priority{ .High, .Normal, .Low }, | |
}; | |
}; | |
for (priority_order) |priority| { | |
const be_fair = steal_attempt == null; | |
const local_queue = &self.run_queues[priority.toArrayIndex()]; | |
const target_queue = &target_queues[priority.toArrayIndex()]; | |
if (@field(local_queue, popAndStealFn)(be_fair, target_queue)) |task| | |
return task; | |
} | |
return null; | |
} | |
}; | |
fn ThreadLocalUsize(comptime UniqueKey: type) type { | |
const is_apple_silicon = std.Target.current.isDarwin() and std.builtin.arch == .aarch64; | |
// For normal platforms, we use the compilers built in "threadlocal" keyword. | |
if (!is_apple_silicon) { | |
return struct { | |
threadlocal var tls_value: usize = 0; | |
pub fn get() usize { | |
return tls_value; | |
} | |
pub fn set(value: usize) void { | |
tls_value = value; | |
} | |
}; | |
} | |
// For Apple Silicon, LLD currently has some issues with it which prevents the threadlocal keyword from work correctly. | |
// So for now we fallback to the OS provided thread local mechanics. | |
return struct { | |
const pthread_key_t = c_ulong; | |
const pthread_once_t = extern struct { | |
__sig: c_long = 0x30B1BCBA, | |
__opaque: [4]u8 = [_]u8{ 0, 0, 0, 0 }, | |
}; | |
extern "c" fn pthread_once(o: *pthread_once_t, f: ?fn() callconv(.C) void) callconv(.C) c_int; | |
extern "c" fn pthread_key_create(k: *pthread_key_t, d: ?fn(?*c_void) callconv(.C) void) callconv(.C) c_int; | |
extern "c" fn pthread_setspecific(k: pthread_key_t, p: ?*c_void) callconv(.C) c_int; | |
extern "c" fn pthread_getspecific(k: pthread_key_t) callconv(.C) ?*c_void; | |
var tls_key: pthread_key_t = undefined; | |
var tls_key_once: pthread_once_t = .{}; | |
fn tls_init() callconv(.C) void { | |
std.debug.assert(pthread_key_create(&tls_key, null) == 0); | |
} | |
pub fn get() usize { | |
std.debug.assert(pthread_once(&tls_key_once, tls_init) == 0); | |
return @ptrToInt(pthread_getspecific(tls_key)); | |
} | |
pub fn set(value: usize) void { | |
std.debug.assert(pthread_once(&tls_key_once, tls_init) == 0); | |
std.debug.assert(pthread_setspecific(tls_key, @intToPtr(?*c_void, value)) == 0); | |
} | |
}; | |
} | |
const Batch = struct { | |
head: ?*Task = null, | |
tail: *Task = undefined, | |
fn from(task: *Task) Batch { | |
task.next = null; | |
return Batch{ | |
.head = task, | |
.tail = task, | |
}; | |
} | |
fn isEmpty(self: Batch) bool { | |
return self.head == null; | |
} | |
fn push(self: *Batch, batch: Batch) void { | |
if (self.isEmpty()) { | |
self.* = batch; | |
} else if (!batch.isEmpty()) { | |
self.tail.next = batch.head; | |
self.tail = batch.tail; | |
} | |
} | |
fn pushFront(self: *Batch, batch: Batch) void { | |
if (self.isEmpty()) { | |
self.* = batch; | |
} else if (!batch.isEmpty()) { | |
batch.tail.next = self.head; | |
self.head = batch.head; | |
} | |
} | |
fn pop(self: *Batch) ?*Task { | |
const task = self.head orelse return null; | |
self.head = task.next; | |
return task; | |
} | |
}; | |
const GlobalQueue = GlobalQueue6; | |
const LocalQueue = LocalQueue6; | |
const GlobalQueue6 = struct { | |
stack: ?*Task = null, | |
pub fn push(self: *GlobalQueue, batch: Batch) void { | |
if (batch.isEmpty()) { | |
return; | |
} | |
batch.tail.next = @atomicLoad(?*Task, &self.stack, .Monotonic); | |
while (true) : (std.Thread.spinLoopHint()) { | |
batch.tail.next = @cmpxchgWeak( | |
?*Task, | |
&self.stack, | |
batch.tail.next, | |
batch.head, | |
.Release, | |
.Monotonic, | |
) orelse return; | |
} | |
} | |
}; | |
const LocalQueue6 = struct { | |
head: usize = 0, | |
tail: usize = 0, | |
overflow: GlobalQueue = .{}, | |
buffer: [128]*Task = undefined, | |
fn push(self: *LocalQueue, _batch: Batch) void { | |
var batch = _batch; | |
if (batch.isEmpty()) { | |
return; | |
} | |
var tail = self.tail; | |
var head = @atomicLoad(usize, &self.head, .Monotonic); | |
while (true) { | |
if (tail -% head < self.buffer.len) { | |
while (tail -% head < self.buffer.len) { | |
const task = batch.pop() orelse break; | |
@atomicStore(*Task, &self.buffer[tail % self.buffer.len], task, .Unordered); | |
tail +%= 1; | |
} | |
@atomicStore(usize, &self.tail, tail, .Release); | |
if (batch.isEmpty()) | |
return; | |
std.Thread.spinLoopHint(); | |
head = @atomicLoad(usize, &self.head, .Monotonic); | |
continue; | |
} | |
const new_head = head +% @intCast(usize, self.buffer.len / 2); | |
if (@cmpxchgWeak( | |
usize, | |
&self.head, | |
head, | |
new_head, | |
.Acquire, | |
.Monotonic, | |
)) |updated| { | |
std.Thread.spinLoopHint(); | |
head = updated; | |
continue; | |
} | |
var overflow = Batch{}; | |
while (head != new_head) : (head +%= 1) { | |
const task = self.buffer[head % self.buffer.len]; | |
overflow.push(Batch.from(task)); | |
} | |
overflow.push(batch); | |
batch = overflow; | |
batch.tail.next = @atomicLoad(?*Task, &self.overflow.stack, .Monotonic); | |
while (true) : (std.Thread.spinLoopHint()) { | |
if (batch.tail.next == null) { | |
@atomicStore(?*Task, &self.overflow.stack, batch.head, .Release); | |
return; | |
} | |
batch.tail.next = @cmpxchgWeak( | |
?*Task, | |
&self.overflow.stack, | |
batch.tail.next, | |
batch.head, | |
.Release, | |
.Monotonic, | |
) orelse return; | |
} | |
} | |
} | |
fn popLocal(self: *LocalQueue) ?*Task { | |
const tail = self.tail; | |
if (@atomicLoad(usize, &self.head, .Monotonic) == tail) | |
return null; | |
@atomicStore(usize, &self.tail, tail -% 1, .SeqCst); | |
const head = @atomicLoad(usize, &self.head, .SeqCst); | |
if (tail == head) { | |
@atomicStore(usize, &self.tail, tail, .Release); | |
return null; | |
} | |
var task: ?*Task = self.buffer[(tail -% 1) % self.buffer.len]; | |
if (head != (tail -% 1)) { | |
return task; | |
} | |
if (@cmpxchgStrong(usize, &self.head, head, head +% 1, .AcqRel, .Monotonic) != null) | |
task = null; | |
@atomicStore(usize, &self.tail, tail, .Release); | |
return task; | |
} | |
fn popStealLocal(target: *LocalQueue) ?*Task { | |
while (true) : (std.Thread.spinLoopHint()) { | |
const target_head = @atomicLoad(usize, &target.head, .Acquire); | |
const target_tail = @atomicLoad(usize, &target.tail, .Acquire); | |
if (target_tail == target_head) | |
return null; | |
if (target_tail == target_head -% 1) | |
return null; | |
if (target_tail -% target_head > target.buffer.len) | |
return null; | |
const task = @atomicLoad(*Task, &target.buffer[target_head % target.buffer.len], .Unordered); | |
_ = @cmpxchgWeak( | |
usize, | |
&target.head, | |
target_head, | |
target_head +% 1, | |
.AcqRel, | |
.Monotonic, | |
) orelse { | |
return task; | |
}; | |
} | |
} | |
fn popStealGlobal(self: *LocalQueue, target: *GlobalQueue) ?*Task { | |
if (@atomicLoad(?*Task, &target.stack, .Monotonic) == null) | |
return null; | |
const task = @atomicRmw(?*Task, &target.stack, .Xchg, null, .Acquire) orelse return null; | |
var overflow = task.next; | |
if (overflow != null) { | |
var tail = self.tail; | |
var head = @atomicLoad(usize, &self.head, .Monotonic); | |
if (tail != head) | |
std.debug.panic("tail != head on steal (t={} h={} d={})\n", .{tail, head, tail -% head}); | |
var i: usize = self.buffer.len; | |
while (i != 0) : (i -= 1) { | |
const t = overflow orelse break; | |
overflow = t.next; | |
@atomicStore(*Task, &self.buffer[tail % self.buffer.len], t, .Unordered); | |
tail +%= 1; | |
} | |
@atomicStore(usize, &self.tail, tail, .Release); | |
} | |
if (overflow != null) { | |
if (@atomicLoad(?*Task, &self.overflow.stack, .Monotonic) != null) | |
std.debug.panic("non null overflow stack", .{}); | |
@atomicStore(?*Task, &self.overflow.stack, overflow, .Release); | |
} | |
return task; | |
} | |
fn pop(self: *LocalQueue, be_fair: bool) ?*Task { | |
if (self.popLocal()) |task| | |
return task; | |
if (self.popStealGlobal(&self.overflow)) |task| | |
return task; | |
return null; | |
} | |
fn popAndStealGlobal(self: *LocalQueue, be_fair: bool, target: *GlobalQueue) ?*Task { | |
return self.popStealGlobal(&self.overflow); | |
} | |
fn popAndStealLocal(self: *LocalQueue, be_fair: bool, target: *LocalQueue) ?*Task { | |
if (target.popStealLocal()) |task| | |
return task; | |
if (self.popStealGlobal(&target.overflow)) |task| | |
return task; | |
return null; | |
} | |
}; | |
const GlobalQueue5 = LocalQueue; | |
const LocalQueue5 = struct { | |
stack: usize = 0, | |
local: ?*Task = null, | |
const MASK = IS_POPPING | HAS_LOCALS; | |
const HAS_LOCALS: usize = 0b01; | |
const IS_POPPING: usize = 0b10; | |
fn push(self: *LocalQueue, batch: Batch) void { | |
if (batch.isEmpty()) { | |
return; | |
} | |
var stack = @atomicLoad(usize, &self.stack, .Monotonic); | |
while (true) : (std.Thread.spinLoopHint()) { | |
batch.tail.next = @intToPtr(?*Task, stack & ~MASK); | |
stack = @cmpxchgWeak( | |
usize, | |
&self.stack, | |
stack, | |
@ptrToInt(batch.head) | (stack & MASK), | |
.Release, | |
.Monotonic, | |
) orelse return; | |
} | |
} | |
fn pop(self: *LocalQueue, be_fair: bool) ?*Task { | |
var stack = @atomicLoad(usize, &self.stack, .Monotonic); | |
while (true) : (std.Thread.spinLoopHint()) { | |
if (stack & ~IS_POPPING == 0) | |
return null; | |
if (stack & IS_POPPING != 0) | |
return null; | |
var new_stack = stack | IS_POPPING | HAS_LOCALS; | |
if (stack & HAS_LOCALS == 0) { | |
new_stack &= MASK; | |
} | |
stack = @cmpxchgWeak( | |
usize, | |
&self.stack, | |
stack, | |
new_stack, | |
.Acquire, | |
.Monotonic, | |
) orelse break; | |
} | |
while (true) { | |
const task = self.local orelse @intToPtr(?*Task, stack & ~MASK) orelse unreachable; | |
self.local = task.next; | |
if (self.local != null) { | |
_ = @atomicRmw(usize, &self.stack, .And, ~IS_POPPING, .Release); | |
return task; | |
} | |
stack = @atomicLoad(usize, &self.stack, .Monotonic); | |
while (true) : (std.Thread.spinLoopHint()) { | |
self.local = @intToPtr(?*Task, stack & ~MASK); | |
var new_stack: usize = 0; | |
if (self.local != null) | |
new_stack = HAS_LOCALS; | |
stack = @cmpxchgWeak( | |
usize, | |
&self.stack, | |
stack, | |
new_stack, | |
.Release, | |
.Monotonic, | |
) orelse break; | |
} | |
return task; | |
} | |
} | |
fn popAndStealGlobal(self: *LocalQueue, be_fair: bool, target: *GlobalQueue) ?*Task { | |
return self.popAndStealLocal(be_fair, target); | |
} | |
fn popAndStealLocal(self: *LocalQueue, be_fair: bool, target: *LocalQueue) ?*Task { | |
return target.pop(be_fair); | |
} | |
}; | |
const GlobalQueue4 = LocalQueue; | |
const LocalQueue4 = struct { | |
lock: Lock = .{}, | |
local: ?*Task = null, | |
contended: ?*Task = null, | |
has_pending: bool = false, | |
fn push(self: *LocalQueue, batch: Batch) void { | |
if (batch.isEmpty()) { | |
return; | |
} | |
while (true) : (std.Thread.spinLoopHint()) { | |
if (self.lock.tryAcquire()) |held| { | |
@atomicStore(bool, &self.has_pending, true, .Monotonic); | |
batch.tail.next = self.local; | |
self.local = batch.head; | |
held.release(); | |
return; | |
} | |
batch.tail.next = @atomicLoad(?*Task, &self.contended, .Monotonic); | |
if (batch.tail.next == null) { | |
@atomicStore(?*Task, &self.contended, batch.head, .Release); | |
@atomicStore(bool, &self.has_pending, true, .Release); | |
return; | |
} | |
_ = @cmpxchgStrong( | |
?*Task, | |
&self.contended, | |
batch.tail.next, | |
batch.head, | |
.Release, | |
.Monotonic, | |
) orelse { | |
@atomicStore(bool, &self.has_pending, true, .Release); | |
return; | |
}; | |
} | |
} | |
fn pop(self: *LocalQueue, be_fair: bool) ?*Task { | |
if (!@atomicLoad(bool, &self.has_pending, .Monotonic)) | |
return null; | |
const held = self.lock.tryAcquire() orelse return null; | |
defer held.release(); | |
if (!@atomicLoad(bool, &self.has_pending, .Monotonic)) | |
return null; | |
while (true) { | |
if (self.local) |task| { | |
self.local = task.next; | |
return task; | |
} | |
if (@atomicRmw(?*Task, &self.contended, .Xchg, null, .Acquire)) |contended| { | |
self.local = contended; | |
continue; | |
} | |
if (@cmpxchgStrong(bool, &self.has_pending, true, false, .Acquire, .Monotonic) != null) { | |
return null; | |
} | |
} | |
} | |
fn popAndStealGlobal(self: *LocalQueue, be_fair: bool, target: *GlobalQueue) ?*Task { | |
return self.popAndStealLocal(be_fair, target); | |
} | |
fn popAndStealLocal(self: *LocalQueue, be_fair: bool, target: *LocalQueue) ?*Task { | |
return target.pop(be_fair); | |
} | |
}; | |
const GlobalQueue3 = LocalQueue; | |
const LocalQueue3 = struct { | |
lock: Lock = .{}, | |
batch: Batch = .{}, | |
has_pending: bool = false, | |
fn push(self: *LocalQueue, batch: Batch) void { | |
if (batch.isEmpty()) { | |
return; | |
} | |
const held = self.lock.acquire(); | |
defer held.release(); | |
@atomicStore(bool, &self.has_pending, true, .Monotonic); | |
self.batch.pushFront(batch); | |
} | |
fn pop(self: *LocalQueue, be_fair: bool) ?*Task { | |
if (!@atomicLoad(bool, &self.has_pending, .Monotonic)) | |
return null; | |
const held = self.lock.tryAcquire() orelse return null; | |
defer held.release(); | |
if (!@atomicLoad(bool, &self.has_pending, .Monotonic)) | |
return null; | |
return self.batch.pop() orelse { | |
@atomicStore(bool, &self.has_pending, false, .Monotonic); | |
return null; | |
}; | |
} | |
fn popAndStealGlobal(self: *LocalQueue, be_fair: bool, target: *GlobalQueue) ?*Task { | |
return self.popAndStealLocal(be_fair, target); | |
} | |
fn popAndStealLocal(self: *LocalQueue, be_fair: bool, target: *LocalQueue) ?*Task { | |
return target.pop(be_fair); | |
} | |
}; | |
const GlobalQueue2 = LocalQueue; | |
const LocalQueue2 = struct { | |
queue: UnboundedQueue = .{}, | |
fn push(self: *LocalQueue, batch: Batch) void { | |
self.queue.push(batch); | |
} | |
fn pop(self: *LocalQueue, be_fair: bool) ?*Task { | |
var consumer = self.queue.tryAcquireConsumer() orelse return null; | |
defer consumer.release(); | |
return consumer.pop(); | |
} | |
fn popAndStealGlobal(self: *LocalQueue, be_fair: bool, target: *GlobalQueue) ?*Task { | |
return self.popAndStealLocal(be_fair, target); | |
} | |
fn popAndStealLocal(self: *LocalQueue, be_fair: bool, target: *LocalQueue) ?*Task { | |
return target.pop(); | |
} | |
}; | |
const GlobalQueue1 = UnboundedQueue; | |
const LocalQueue1 = struct { | |
buffer: BoundedQueue = .{}, | |
overflow: UnboundedQueue = .{}, | |
fn push(self: *LocalQueue, batch: Batch) void { | |
if (self.buffer.push(batch)) |overflowed| | |
self.overflow.push(overflowed); | |
} | |
fn pop(self: *LocalQueue, be_fair: bool) ?*Task { | |
if (be_fair) { | |
if (self.buffer.popAndStealUnbounded(&self.overflow)) |task| | |
return task; | |
} | |
if (self.buffer.pop()) |task| | |
return task; | |
if (self.buffer.popAndStealUnbounded(&self.overflow)) |task| | |
return task; | |
return null; | |
// return self.buffer.pop() orelse self.buffer.popAndStealUnbounded(&self.overflow); | |
// if (be_fair) { | |
// if (self.buffer.pop()) |task| | |
// return task; | |
// } | |
// if (self.buffer.popAndStealUnbounded(&self.overflow)) |task| | |
// return task; | |
// if (self.buffer.pop()) |task| | |
// return task; | |
// return null; | |
} | |
fn popAndStealGlobal(self: *LocalQueue, be_fair: bool, target: *GlobalQueue) ?*Task { | |
return self.buffer.popAndStealUnbounded(target); | |
} | |
fn popAndStealLocal(self: *LocalQueue, be_fair: bool, target: *LocalQueue) ?*Task { | |
if (self.buffer.popAndStealBounded(&target.buffer)) |task| | |
return task; | |
if (self.buffer.popAndStealUnbounded(&target.overflow)) |task| | |
return task; | |
return null; | |
// if (self == target) | |
// return self.pop(be_fair); | |
// if (be_fair) { | |
// if (self.buffer.popAndStealBounded(&target.buffer)) |task| | |
// return task; | |
// } | |
// if (self.buffer.popAndStealUnbounded(&target.overflow)) |task| | |
// return task; | |
// if (self.buffer.popAndStealBounded(&target.buffer)) |task| | |
// return task; | |
// return null; | |
} | |
}; | |
const UnboundedQueue = struct { | |
head: ?*Task = null, | |
tail: usize = 0, | |
stub: Task = .{ | |
.next = null, | |
.data = undefined, | |
}, | |
fn push(self: *UnboundedQueue, batch: Batch) void { | |
if (batch.isEmpty()) | |
return; | |
const head = @atomicRmw(?*Task, &self.head, .Xchg, batch.tail, .AcqRel); | |
const prev = head orelse &self.stub; | |
@atomicStore(?*Task, &prev.next, batch.head, .Release); | |
} | |
fn tryAcquireConsumer(self: *UnboundedQueue) ?Consumer { | |
// var head = @atomicLoad(?*Task, &self.head, .Monotonic); | |
// if (head == null or head == &self.stub) | |
// return null; | |
// var tail = @atomicRmw(usize, &self.tail, .Xchg, 1, .Acquire); | |
// if (tail & 1 != 0) | |
// return null; | |
while (true) : (std.Thread.spinLoopHint()) { | |
const head = @atomicLoad(?*Task, &self.head, .Monotonic); | |
if (head == null or head == &self.stub) | |
return null; | |
const tail = @atomicLoad(usize, &self.tail, .Monotonic); | |
if (tail & 1 != 0) | |
return null; | |
_ = @cmpxchgWeak( | |
usize, | |
&self.tail, | |
tail, | |
tail | 1, | |
.Acquire, | |
.Monotonic, | |
) orelse return Consumer{ | |
.queue = self, | |
.tail = @intToPtr(?*Task, tail) orelse &self.stub, | |
}; | |
} | |
// var tail = @atomicLoad(usize, &self.tail, .Monotonic); | |
// while (true) : (std.Thread.spinLoopHint()) { | |
// const head = @atomicLoad(?*Task, &self.head, .Monotonic); | |
// if (head == null or head == &self.stub) | |
// return null; | |
// if (tail & 1 != 0) | |
// return null; | |
// tail = @cmpxchgWeak( | |
// usize, | |
// &self.tail, | |
// tail, | |
// tail | 1, | |
// .Acquire, | |
// .Monotonic, | |
// ) orelse return Consumer{ | |
// .queue = self, | |
// .tail = @intToPtr(?*Task, tail) orelse &self.stub, | |
// }; | |
// } | |
} | |
const Consumer = struct { | |
queue: *UnboundedQueue, | |
tail: *Task, | |
fn release(self: Consumer) void { | |
@atomicStore(usize, &self.queue.tail, @ptrToInt(self.tail), .Release); | |
} | |
fn pop(self: *Consumer) ?*Task { | |
var tail = self.tail; | |
var next = @atomicLoad(?*Task, &tail.next, .Acquire); | |
if (tail == &self.queue.stub) { | |
tail = next orelse return null; | |
self.tail = tail; | |
next = @atomicLoad(?*Task, &tail.next, .Acquire); | |
} | |
if (next) |task| { | |
self.tail = task; | |
return tail; | |
} | |
const head = @atomicLoad(?*Task, &self.queue.head, .Monotonic); | |
if (tail != head) { | |
return null; | |
} | |
self.queue.push(Batch.from(&self.queue.stub)); | |
if (@atomicLoad(?*Task, &tail.next, .Acquire)) |task| { | |
self.tail = task; | |
return tail; | |
} | |
return null; | |
} | |
}; | |
}; | |
const BoundedQueue = Bounded1; | |
const Bounded2 = struct { | |
head: Pos = 0, | |
tail: Pos = 0, | |
buffer: [capacity]*Task = undefined, | |
const Pos = std.meta.Int(.unsigned, std.meta.bitCount(usize) / 2); | |
const capacity = 128; | |
comptime { | |
std.debug.assert(capacity <= std.math.maxInt(Pos)); | |
} | |
fn push(self: *BoundedQueue, _batch: Batch) ?Batch { | |
var batch = _batch; | |
if (batch.isEmpty()) { | |
return null; | |
} | |
var tail = self.tail; | |
var head = @atomicLoad(Pos, &self.head, .Monotonic); | |
while (true) { | |
if (batch.isEmpty()) | |
return null; | |
if (tail -% head < self.buffer.len) { | |
while (tail -% head < self.buffer.len) { | |
const task = batch.pop() orelse break; | |
@atomicStore(*Task, &self.buffer[tail % self.buffer.len], task, .Unordered); | |
tail +%= 1; | |
} | |
@atomicStore(Pos, &self.tail, tail, .Release); | |
std.Thread.spinLoopHint(); | |
head = @atomicLoad(Pos, &self.head, .Monotonic); | |
continue; | |
} | |
const new_head = head +% @intCast(Pos, self.buffer.len / 2); | |
if (@cmpxchgWeak( | |
Pos, | |
&self.head, | |
head, | |
new_head, | |
.Acquire, | |
.Monotonic, | |
)) |updated| { | |
std.Thread.spinLoopHint(); | |
head = updated; | |
continue; | |
} | |
var overflowed = Batch{}; | |
while (head != new_head) : (head +%= 1) { | |
const task = self.buffer[head % self.buffer.len]; | |
overflowed.push(Batch.from(task)); | |
} | |
overflowed.push(batch); | |
return overflowed; | |
} | |
} | |
fn pop(self: *BoundedQueue) ?*Task { | |
const tail = self.tail; | |
if (@atomicLoad(Pos, &self.head, .Monotonic) == tail) | |
return null; | |
@atomicStore(Pos, &self.tail, tail -% 1, .SeqCst); | |
const head = @atomicLoad(Pos, &self.head, .SeqCst); | |
if (tail == head) { | |
@atomicStore(Pos, &self.tail, tail, .Release); | |
return null; | |
} | |
var task: ?*Task = self.buffer[(tail -% 1) % self.buffer.len]; | |
if (head != (tail -% 1)) { | |
return task; | |
} | |
if (@cmpxchgStrong(Pos, &self.head, head, head +% 1, .AcqRel, .Monotonic) != null) | |
task = null; | |
@atomicStore(Pos, &self.tail, tail, .Release); | |
return task; | |
} | |
fn popAndStealUnbounded(self: *BoundedQueue, target: *UnboundedQueue) ?*Task { | |
var consumer = target.tryAcquireConsumer() orelse return null; | |
defer consumer.release(); | |
const tail = self.tail; | |
const head = @atomicLoad(Pos, &self.head, .Monotonic); | |
var new_tail = tail; | |
var first_task: ?*Task = null; | |
while (first_task == null or (new_tail -% head < self.buffer.len)) { | |
const task = consumer.pop() orelse break; | |
if (first_task == null) { | |
first_task = task; | |
} else { | |
@atomicStore(*Task, &self.buffer[new_tail % self.buffer.len], task, .Unordered); | |
new_tail +%= 1; | |
} | |
} | |
if (new_tail != tail) | |
@atomicStore(Pos, &self.tail, new_tail, .Release); | |
return first_task; | |
} | |
fn popOneBounded(self: *BoundedQueue, target: *BoundedQueue) ?*Task { | |
while (true) : (std.Thread.spinLoopHint()) { | |
const target_head = @atomicLoad(Pos, &target.head, .Acquire); | |
const target_tail = @atomicLoad(Pos, &target.tail, .Acquire); | |
if (target_tail == target_head) | |
return null; | |
if (target_tail == target_head -% 1) | |
return null; | |
if (target_tail -% target_head > self.buffer.len) | |
return null; | |
const task = @atomicLoad(*Task, &target.buffer[target_head % target.buffer.len], .Unordered); | |
_ = @cmpxchgWeak( | |
Pos, | |
&target.head, | |
target_head, | |
target_head +% 1, | |
.AcqRel, | |
.Monotonic, | |
) orelse { | |
return task; | |
}; | |
} | |
} | |
fn popAndStealBounded(self: *BoundedQueue, target: *BoundedQueue) ?*Task { | |
if (self == target) | |
return self.pop(); | |
const head = @atomicLoad(Pos, &self.head, .Monotonic); | |
const tail = self.tail; | |
if (tail != head) | |
return self.pop(); | |
return self.popOneBounded(target); | |
// var new_tail = tail; | |
// const first_task = self.popOneBounded(target) orelse return null; | |
// while (new_tail -% head < self.buffer.len) { | |
// const task = self.popOneBounded(target) orelse break; | |
// @atomicStore(*Task, &self.buffer[new_tail % self.buffer.len], task, .Unordered); | |
// new_tail +%= 1; | |
// } | |
// if (new_tail != tail) | |
// @atomicStore(Pos, &self.tail, new_tail, .Release); | |
// return first_task; | |
} | |
}; | |
const Bounded1 = struct { | |
head: Pos = 0, | |
tail: Pos = 0, | |
buffer: [capacity]*Task = undefined, | |
const Pos = std.meta.Int(.unsigned, std.meta.bitCount(usize) / 2); | |
const capacity = 128; | |
comptime { | |
std.debug.assert(capacity <= std.math.maxInt(Pos)); | |
} | |
fn push(self: *BoundedQueue, _batch: Batch) ?Batch { | |
var batch = _batch; | |
if (batch.isEmpty()) { | |
return null; | |
} | |
var tail = self.tail; | |
var head = @atomicLoad(Pos, &self.head, .Monotonic); | |
while (true) { | |
if (batch.isEmpty()) | |
return null; | |
if (tail -% head < self.buffer.len) { | |
while (tail -% head < self.buffer.len) { | |
const task = batch.pop() orelse break; | |
@atomicStore(*Task, &self.buffer[tail % self.buffer.len], task, .Unordered); | |
tail +%= 1; | |
} | |
@atomicStore(Pos, &self.tail, tail, .Release); | |
std.Thread.spinLoopHint(); | |
head = @atomicLoad(Pos, &self.head, .Monotonic); | |
continue; | |
} | |
const new_head = head +% @intCast(Pos, self.buffer.len / 2); | |
if (@cmpxchgWeak( | |
Pos, | |
&self.head, | |
head, | |
new_head, | |
.Acquire, | |
.Monotonic, | |
)) |updated| { | |
std.Thread.spinLoopHint(); | |
head = updated; | |
continue; | |
} | |
var overflowed = Batch{}; | |
while (head != new_head) : (head +%= 1) { | |
const task = self.buffer[head % self.buffer.len]; | |
overflowed.push(Batch.from(task)); | |
} | |
overflowed.push(batch); | |
return overflowed; | |
} | |
} | |
fn pop(self: *BoundedQueue) ?*Task { | |
var tail = self.tail; | |
var head = @atomicLoad(Pos, &self.head, .Monotonic); | |
while (tail != head) { | |
head = @cmpxchgWeak( | |
Pos, | |
&self.head, | |
head, | |
head +% 1, | |
.Acquire, | |
.Monotonic, | |
) orelse return self.buffer[head % self.buffer.len]; | |
std.Thread.spinLoopHint(); | |
} | |
return null; | |
} | |
fn popAndStealUnbounded(self: *BoundedQueue, target: *UnboundedQueue) ?*Task { | |
var consumer = target.tryAcquireConsumer() orelse return null; | |
defer consumer.release(); | |
const tail = self.tail; | |
const head = @atomicLoad(Pos, &self.head, .Monotonic); | |
var new_tail = tail; | |
var first_task: ?*Task = null; | |
while (first_task == null or (new_tail -% head < self.buffer.len)) { | |
const task = consumer.pop() orelse break; | |
if (first_task == null) { | |
first_task = task; | |
} else { | |
@atomicStore(*Task, &self.buffer[new_tail % self.buffer.len], task, .Unordered); | |
new_tail +%= 1; | |
} | |
} | |
if (new_tail != tail) | |
@atomicStore(Pos, &self.tail, new_tail, .Release); | |
return first_task; | |
} | |
fn popAndStealBounded(self: *BoundedQueue, target: *BoundedQueue) ?*Task { | |
if (self == target) | |
return self.pop(); | |
const head = @atomicLoad(Pos, &self.head, .Monotonic); | |
const tail = self.tail; | |
if (tail != head) | |
return self.pop(); | |
var target_head = @atomicLoad(Pos, &target.head, .Monotonic); | |
while (true) { | |
const target_tail = @atomicLoad(Pos, &target.tail, .Acquire); | |
const target_size = target_tail -% target_head; | |
if (target_size == 0) | |
return null; | |
var steal = target_size - (target_size / 2); | |
if (steal > target.buffer.len / 2) { | |
std.Thread.spinLoopHint(); | |
target_head = @atomicLoad(Pos, &target.head, .Monotonic); | |
continue; | |
} | |
const first_task = @atomicLoad(*Task, &target.buffer[target_head % target.buffer.len], .Unordered); | |
var new_target_head = target_head +% 1; | |
var new_tail = tail; | |
steal -= 1; | |
while (steal > 0) : (steal -= 1) { | |
const task = @atomicLoad(*Task, &target.buffer[new_target_head % target.buffer.len], .Unordered); | |
new_target_head +%= 1; | |
@atomicStore(*Task, &self.buffer[new_tail % self.buffer.len], task, .Unordered); | |
new_tail +%= 1; | |
} | |
if (@cmpxchgWeak( | |
Pos, | |
&target.head, | |
target_head, | |
new_target_head, | |
.AcqRel, | |
.Monotonic, | |
)) |updated| { | |
std.Thread.spinLoopHint(); | |
target_head = updated; | |
continue; | |
} | |
if (new_tail != tail) | |
@atomicStore(Pos, &self.tail, new_tail, .Release); | |
return first_task; | |
} | |
} | |
}; | |
const Lock = if (std.builtin.os.tag == .windows) | |
WindowsLock | |
else if (std.Target.current.isDarwin()) | |
DarwinLock | |
else if (std.builtin.link_libc) | |
PosixLock | |
else if (std.builtin.os.tag == .linux) | |
LinuxLock | |
else | |
@compileError("Unimplemented Lock primitive for platform"); | |
const WindowsLock = struct { | |
srwlock: std.os.windows.SRWLOCK = std.os.windows.SRWLOCK_INIT, | |
const Self = @This(); | |
pub fn deinit(self: *Self) void { | |
self.* = undefined; | |
} | |
pub fn tryAcquire(self: *Self) ?Held { | |
const status = std.os.windows.kernel32.TryAcquireSRWLockExclusive(&self.srwlock); | |
return if (status != std.os.windows.FALSE) Held{ .lock = self } else null; | |
} | |
pub fn acquire(self: *Self) Held { | |
std.os.windows.kernel32.AcquireSRWLockExclusive(&self.srwlock); | |
return Held{ .lock = self }; | |
} | |
pub const Held = struct { | |
lock: *Self, | |
pub fn release(self: Held) void { | |
std.os.windows.kernel32.ReleaseSRWLockExclusive(&self.lock.srwlock); | |
} | |
}; | |
}; | |
const DarwinLock = extern struct { | |
os_unfair_lock: u32 = 0, | |
const Self = @This(); | |
extern fn os_unfair_lock_lock(lock: *Self) void; | |
extern fn os_unfair_lock_unlock(lock: *Self) void; | |
pub fn deinit(self: *Self) void { | |
self.* = undefined; | |
} | |
pub fn acquire(self: *Self) Held { | |
os_unfair_lock_lock(self); | |
return Held{ .lock = self }; | |
} | |
pub const Held = struct { | |
lock: *Self, | |
pub fn release(self: Held) void { | |
os_unfair_lock_unlock(self.lock); | |
} | |
}; | |
}; | |
const PosixLock = struct { | |
mutex: std.c.pthread_mutex_t = std.c.PTHREAD_MUTEX_INITIALIZER, | |
const Self = @This(); | |
pub fn deinit(self: *Self) void { | |
const rc = std.c.pthread_mutex_destroy(&self.mutex); | |
std.debug.assert(rc == 0 or rc == std.os.EINVAL); | |
self.* = undefined; | |
} | |
pub fn acquire(self: *Self) Held { | |
std.debug.assert(std.c.pthread_mutex_lock(&self.mutex) == 0); | |
return Held{ .lock = self }; | |
} | |
pub const Held = struct { | |
lock: *Self, | |
pub fn release(self: Held) void { | |
std.debug.assert(std.c.pthread_mutex_unlock(&self.lock.mutex) == 0); | |
} | |
}; | |
}; | |
const LinuxLock = struct { | |
state: State = .unlocked, | |
const Self = @This(); | |
const State = enum(i32) { | |
unlocked = 0, | |
locked, | |
contended, | |
}; | |
pub fn deinit(self: *Self) void { | |
self.* = undefined; | |
} | |
pub fn acquire(self: *Self) Held { | |
if (@cmpxchgWeak( | |
State, | |
&self.state, | |
.unlocked, | |
.locked, | |
.Acquire, | |
.Monotonic, | |
)) |_| { | |
self.acquireSlow(); | |
} | |
return Held{ .lock = self }; | |
} | |
fn acquireSlow(self: *Self) void { | |
@setCold(true); | |
var spin: usize = 0; | |
var lock_state = State.locked; | |
var state = @atomicLoad(State, &self.state, .Monotonic); | |
while (true) { | |
if (state == .unlocked) { | |
state = @cmpxchgWeak( | |
State, | |
&self.state, | |
state, | |
lock_state, | |
.Acquire, | |
.Monotonic, | |
) orelse return; | |
std.Thread.spinLoopHint(); | |
continue; | |
} | |
if (state == .locked and spin < 100) { | |
spin += 1; | |
std.Thread.spinLoopHint(); | |
state = @atomicLoad(State, &self.state, .Monotonic); | |
} | |
if (state != .contended) { | |
if (@cmpxchgWeak( | |
State, | |
&self.state, | |
state, | |
.contended, | |
.Monotonic, | |
.Monotonic, | |
)) |updated| { | |
std.Thread.spinLoopHint(); | |
state = updated; | |
continue; | |
} | |
} | |
switch (std.os.linux.getErrno(std.os.linux.futex_wait( | |
@ptrCast(*const i32, &self.state), | |
std.os.linux.FUTEX_PRIVATE_FLAG | std.os.linux.FUTEX_WAIT, | |
@enumToInt(State.contended), | |
null, | |
))) { | |
0 => {}, | |
std.os.EINTR => {}, | |
std.os.EAGAIN => {}, | |
std.os.ETIMEDOUT => unreachable, | |
else => unreachable, | |
} | |
spin = 0; | |
lock_state = .contended; | |
state = @atomicLoad(State, &self.state, .Monotonic); | |
} | |
} | |
pub const Held = struct { | |
lock: *Self, | |
pub fn release(self: Held) void { | |
switch (@atomicRmw(State, &self.lock.state, .Xchg, .unlocked, .Release)) { | |
.unlocked => unreachable, | |
.locked => {}, | |
.contended => self.releaseSlow(), | |
} | |
} | |
fn releaseSlow(self: Held) void { | |
@setCold(true); | |
switch (std.os.linux.getErrno(std.os.linux.futex_wake( | |
@ptrCast(*const i32, &self.lock.state), | |
std.os.linux.FUTEX_PRIVATE_FLAG | std.os.linux.FUTEX_WAKE, | |
1, | |
))) { | |
0 => {}, | |
std.os.EINVAL => {}, | |
std.os.EACCES => {}, | |
std.os.EFAULT => {}, | |
else => unreachable, | |
} | |
} | |
}; | |
}; | |
const Event = if (std.builtin.os.tag == .windows) | |
WindowsEvent | |
else if (std.builtin.link_libc) | |
PosixEvent | |
else if (std.builtin.os.tag == .linux) | |
LinuxEvent | |
else | |
@compileError("Unimplemented Event primitive for platform"); | |
const WindowsEvent = struct { | |
is_set: bool = false, | |
lock: std.os.windows.SRWLOCK = std.os.windows.SRWLOCK_INIT, | |
cond: std.os.windows.CONDITION_VARIABLE = std.os.windows.CONDITION_VARIABLE_INIT, | |
const Self = @This(); | |
pub fn deinit(self: *Self) void { | |
self.* = undefined; | |
} | |
pub fn set(self: *Self) void { | |
std.os.windows.kernel32.AcquireSRWLockExclusive(&self.lock); | |
defer std.os.windows.kernel32.ReleaseSRWLockExclusive(&self.lock); | |
self.is_set = true; | |
std.os.windows.kernel32.WakeConditionVariable(&self.cond); | |
} | |
threadlocal var tls_frequency: u64 = 0; | |
pub fn wait(self: *Self, timeout: ?u64) bool { | |
var counter: u64 = undefined; | |
var frequency: u64 = undefined; | |
if (timeout) |timeout_ns| { | |
counter = std.os.windows.QueryPerformanceCounter(); | |
frequency = tls_frequency; | |
if (frequency == 0) { | |
frequency = std.os.windows.QueryPerformanceFrequency(); | |
tls_frequency = frequency; | |
} | |
} | |
std.os.windows.kernel32.AcquireSRWLockExclusive(&self.lock); | |
defer std.os.windows.kernel32.ReleaseSRWLockExclusive(&self.lock); | |
while (!self.is_set) { | |
var timeout_ms: std.os.windows.DWORD = std.os.windows.INFINITE; | |
if (timeout) |timeout_ns| { | |
const elapsed = blk: { | |
const now = std.os.windows.QueryPerformanceCounter(); | |
const a = if (now >= counter) (now - counter) else 0; | |
const b = std.time.ns_per_s; | |
const c = frequency; | |
break :blk (((a / c) * b) + ((a % c) * b) / c); | |
}; | |
if (elapsed > timeout_ns) { | |
return false; | |
} else { | |
const delay_ms = @divFloor(timeout_ns - elapsed, std.time.ns_per_ms); | |
timeout_ms = std.math.cast(std.os.windows.DWORD, delay_ms) catch timeout_ms; | |
} | |
} | |
const status = std.os.windows.kernel32.SleepConditionVariableSRW( | |
&self.cond, | |
&self.lock, | |
timeout_ms, | |
0, | |
); | |
if (status == std.os.windows.FALSE) { | |
switch (std.os.windows.kernel32.GetLastError()) { | |
.TIMEOUT => {}, | |
else => |err| { | |
const e = std.os.windows.unexpectedError(err); | |
unreachable; | |
}, | |
} | |
} | |
} | |
return true; | |
} | |
}; | |
const PosixEvent = struct { | |
mutex: std.c.pthread_mutex_t = std.c.PTHREAD_MUTEX_INITIALIZER, | |
cond: std.c.pthread_cond_t = std.c.PTHREAD_COND_INITIALIZER, | |
is_set: bool = false, | |
const Self = @This(); | |
pub fn deinit(self: *Self) void { | |
const m = std.c.pthread_mutex_destroy(&self.mutex); | |
std.debug.assert(m == 0 or m == std.os.EINVAL); | |
const c = std.c.pthread_cond_destroy(&self.cond); | |
std.debug.assert(c == 0 or c == std.os.EINVAL); | |
self.* = undefined; | |
} | |
pub fn set(self: *Self) void { | |
std.debug.assert(std.c.pthread_mutex_lock(&self.mutex) == 0); | |
defer std.debug.assert(std.c.pthread_mutex_unlock(&self.mutex) == 0); | |
self.is_set = true; | |
std.debug.assert(std.c.pthread_cond_signal(&self.cond) == 0); | |
} | |
pub fn wait(self: *Self, timeout: ?u64) bool { | |
std.debug.assert(std.c.pthread_mutex_lock(&self.mutex) == 0); | |
defer std.debug.assert(std.c.pthread_mutex_unlock(&self.mutex) == 0); | |
var deadline: ?u64 = null; | |
if (timeout) |timeout_ns| { | |
deadline = timestamp(std.os.CLOCK_MONOTONIC) + timeout_ns; | |
} | |
while (!self.is_set) { | |
const deadline_ns = deadline orelse { | |
std.debug.assert(std.c.pthread_cond_wait(&self.cond, &self.mutex) == 0); | |
continue; | |
}; | |
var now_ns = timestamp(std.os.CLOCK_MONOTONIC); | |
if (now_ns >= deadline_ns) { | |
return false; | |
} else { | |
now_ns = timestamp(std.os.CLOCK_REALTIME); | |
now_ns += deadline_ns - now_ns; | |
} | |
var ts: std.os.timespec = undefined; | |
ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(now_ns, std.time.ns_per_s)); | |
ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(now_ns, std.time.ns_per_s)); | |
const rc = std.c.pthread_cond_timedwait(&self.cond, &self.mutex, &ts); | |
std.debug.assert(rc == 0 or rc == std.os.ETIMEDOUT); | |
} | |
return true; | |
} | |
fn timestamp(comptime clock: u32) u64 { | |
if (comptime std.Target.current.isDarwin()) { | |
switch (clock) { | |
std.os.CLOCK_REALTIME => { | |
var tv: std.os.timeval = undefined; | |
std.os.gettimeofday(&tv, null); | |
return (@intCast(u64, tv.tv_sec) * std.time.ns_per_s) + (@intCast(u64, tv.tv_usec) * std.time.ns_per_us); | |
}, | |
std.os.CLOCK_MONOTONIC => { | |
var info: std.os.darwin.mach_timebase_info_data = undefined; | |
std.os.darwin.mach_timebase_info(&info); | |
var counter = std.os.darwin.mach_absolute_time(); | |
if (info.numer > 1) | |
counter *= info.numer; | |
if (info.denom > 1) | |
counter /= info.denom; | |
return counter; | |
}, | |
else => unreachable, | |
} | |
} | |
var ts: std.os.timespec = undefined; | |
std.os.clock_gettime(clock, &ts) catch return 0; | |
return (@intCast(u64, ts.tv_sec) * std.time.ns_per_s) + @intCast(u64, ts.tv_nsec); | |
} | |
}; | |
const LinuxEvent = struct { | |
state: State = .unset, | |
const Self = @This(); | |
const State = enum(i32) { | |
unset = 0, | |
set, | |
}; | |
pub fn deinit(self: *Self) void { | |
self.* = undefined; | |
} | |
pub fn set(self: *Self) void { | |
@atomicStore(State, &self.state, .set, .Release); | |
switch (std.os.linux.getErrno(std.os.linux.futex_wake( | |
@ptrCast(*const i32, &self.state), | |
std.os.linux.FUTEX_PRIVATE_FLAG | std.os.linux.FUTEX_WAKE, | |
1, | |
))) { | |
0 => {}, | |
std.os.EINVAL => {}, | |
std.os.EACCES => {}, | |
std.os.EFAULT => {}, | |
else => unreachable, | |
} | |
} | |
pub fn wait(self: *Self, timeout: ?u64) bool { | |
var deadline: ?u64 = null; | |
while (true) { | |
if (@atomicLoad(State, &self.state, .Acquire) == .set) | |
return true; | |
var ts: std.os.timespec = undefined; | |
var ts_ptr: ?*std.os.timespec = null; | |
if (timeout) |timeout_ns| { | |
const delay_ns = delay: { | |
std.os.clock_gettime(std.os.CLOCK_MONOTONIC, &ts) catch return false; | |
const now_ns = (@intCast(u64, ts.tv_sec) * std.time.ns_per_s) + @intCast(u64, ts.tv_nsec); | |
if (deadline) |deadline_ns| { | |
if (now_ns >= deadline_ns) | |
return false; | |
break :delay (deadline_ns - now_ns); | |
} else { | |
deadline = now_ns + timeout_ns; | |
break :delay timeout_ns; | |
} | |
}; | |
ts_ptr = &ts; | |
ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(delay_ns, std.time.ns_per_s)); | |
ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(delay_ns, std.time.ns_per_s)); | |
} | |
switch (std.os.linux.getErrno(std.os.linux.futex_wait( | |
@ptrCast(*const i32, &self.state), | |
std.os.linux.FUTEX_PRIVATE_FLAG | std.os.linux.FUTEX_WAIT, | |
@enumToInt(State.unset), | |
ts_ptr, | |
))) { | |
0 => {}, | |
std.os.EINTR => {}, | |
std.os.EAGAIN => {}, | |
std.os.ETIMEDOUT => return false, | |
else => unreachable, | |
} | |
} | |
} | |
}; | |
const Thread = if (std.builtin.os.tag == .windows) | |
WindowsThread | |
else if (std.builtin.link_libc) | |
PosixThread | |
else if (std.builtin.os.tag == .linux) | |
LinuxThread | |
else | |
@compileError("Unimplemented Thread primitive for platform"); | |
const WindowsThread = struct { | |
handle: std.os.windows.HANDLE, | |
const Self = @This(); | |
pub fn spawn(stack_size: usize, context: anytype, comptime entryFn: anytype) !Self { | |
const Context = @TypeOf(context); | |
const Wrapper = struct { | |
fn entry(raw_arg: std.os.windows.LPVOID) callconv(.C) std.os.windows.DWORD { | |
entryFn(@ptrCast(Context, @alignCast(@alignOf(Context), raw_arg))); | |
return 0; | |
} | |
}; | |
const handle = std.os.windows.kernel32.CreateThread( | |
null, | |
stack_size, | |
Wrapper.entry, | |
@ptrCast(std.os.windows.LPVOID, context), | |
0, | |
null, | |
) orelse return error.SpawnError; | |
return Self{ .handle = handle }; | |
} | |
pub fn join(self: Self) void { | |
std.os.windows.WaitForSingleObjectEx(self.handle, std.os.windows.INFINITE, false) catch unreachable; | |
std.os.windows.CloseHandle(self.handle); | |
} | |
}; | |
const PosixThread = struct { | |
handle: std.c.pthread_t, | |
const Self = @This(); | |
pub fn spawn(stack_size: usize, context: anytype, comptime entryFn: anytype) !Self { | |
const Context = @TypeOf(context); | |
const Wrapper = struct { | |
fn entry(raw_arg: ?*c_void) callconv(.C) ?*c_void { | |
entryFn(@ptrCast(Context, @alignCast(@alignOf(Context), raw_arg))); | |
return null; | |
} | |
}; | |
var attr: std.c.pthread_attr_t = undefined; | |
if (std.c.pthread_attr_init(&attr) != 0) | |
return error.SystemResources; | |
defer std.debug.assert(std.c.pthread_attr_destroy(&attr) == 0); | |
if (std.c.pthread_attr_setstacksize(&attr, stack_size) != 0) | |
return error.SystemResources; | |
var handle: std.c.pthread_t = undefined; | |
const rc = std.c.pthread_create( | |
&handle, | |
&attr, | |
Wrapper.entry, | |
@ptrCast(?*c_void, context), | |
); | |
return switch (rc) { | |
0 => Self{ .handle = handle }, | |
else => error.SpawnError, | |
}; | |
} | |
pub fn join(self: Self) void { | |
const rc = std.c.pthread_join(self.handle, null); | |
std.debug.assert(rc == 0); | |
} | |
}; | |
const LinuxThread = struct { | |
info: *Info, | |
const Self = @This(); | |
const Info = struct { | |
mmap_ptr: usize, | |
mmap_len: usize, | |
context: usize, | |
handle: i32, | |
}; | |
pub fn spawn(stack_size: usize, context: anytype, comptime entryFn: anytype) !Self { | |
var mmap_size: usize = std.mem.page_size; | |
const guard_end = mmap_size; | |
mmap_size = std.mem.alignForward(mmap_size + stack_size, std.mem.page_size); | |
const stack_end = mmap_size; | |
mmap_size = std.mem.alignForward(mmap_size, @alignOf(Info)); | |
const info_begin = mmap_size; | |
mmap_size = std.mem.alignForward(mmap_size + @sizeOf(Info), std.os.linux.tls.tls_image.alloc_align); | |
const tls_begin = mmap_size; | |
mmap_size = std.mem.alignForward(mmap_size + std.os.linux.tls.tls_image.alloc_size, std.mem.page_size); | |
const mmap_bytes = try std.os.mmap( | |
null, | |
mmap_size, | |
std.os.PROT_NONE, | |
std.os.MAP_PRIVATE | std.os.MAP_ANONYMOUS, | |
-1, | |
0, | |
); | |
errdefer std.os.munmap(mmap_bytes); | |
try std.os.mprotect( | |
mmap_bytes[guard_end..], | |
std.os.PROT_READ | std.os.PROT_WRITE, | |
); | |
const info = @ptrCast(*Info, @alignCast(@alignOf(Info), &mmap_bytes[info_begin])); | |
info.* = .{ | |
.mmap_ptr = @ptrToInt(mmap_bytes.ptr), | |
.mmap_len = mmap_bytes.len, | |
.context = @ptrToInt(context), | |
.handle = undefined, | |
}; | |
var user_desc: switch (std.builtin.arch) { | |
.i386 => std.os.linux.user_desc, | |
else => void, | |
} = undefined; | |
var tls_ptr = std.os.linux.tls.prepareTLS(mmap_bytes[tls_begin..]); | |
if (std.builtin.arch == .i386) { | |
defer tls_ptr = @ptrToInt(&user_desc); | |
user_desc = .{ | |
.entry_number = std.os.linux.tls.tls_image.gdt_entry_number, | |
.base_addr = tls_ptr, | |
.limit = 0xfffff, | |
.seg_32bit = 1, | |
.contents = 0, | |
.read_exec_only = 0, | |
.limit_in_pages = 1, | |
.seg_not_present = 0, | |
.useable = 1, | |
}; | |
} | |
const flags: u32 = | |
std.os.CLONE_SIGHAND | std.os.CLONE_SYSVSEM | | |
std.os.CLONE_VM | std.os.CLONE_FS | std.os.CLONE_FILES | | |
std.os.CLONE_PARENT_SETTID | std.os.CLONE_CHILD_CLEARTID | | |
std.os.CLONE_THREAD | std.os.CLONE_DETACHED | std.os.CLONE_SETTLS; | |
const Context = @TypeOf(context); | |
const Wrapper = struct { | |
fn entry(raw_arg: usize) callconv(.C) u8 { | |
const info_ptr = @intToPtr(*Info, raw_arg); | |
entryFn(@intToPtr(Context, info_ptr.context)); | |
return 0; | |
} | |
}; | |
const rc = std.os.linux.clone( | |
Wrapper.entry, | |
@ptrToInt(&mmap_bytes[stack_end]), | |
flags, | |
@ptrToInt(info), | |
&info.handle, | |
tls_ptr, | |
&info.handle, | |
); | |
return switch (std.os.linux.getErrno(rc)) { | |
0 => Self{ .info = info }, | |
else => error.SpawnError, | |
}; | |
} | |
pub fn join(self: Self) void { | |
while (true) { | |
const tid = @atomicLoad(i32, &self.info.handle, .SeqCst); | |
if (tid == 0) { | |
std.os.munmap(@intToPtr([*]align(std.mem.page_size) u8, self.info.mmap_ptr)[0..self.info.mmap_len]); | |
return; | |
} | |
const rc = std.os.linux.futex_wait(&self.info.handle, std.os.linux.FUTEX_WAIT, tid, null); | |
switch (std.os.linux.getErrno(rc)) { | |
0 => continue, | |
std.os.EINTR => continue, | |
std.os.EAGAIN => continue, | |
else => unreachable, | |
} | |
} | |
} | |
}; | |
pub fn main() !void { | |
const FibRecurse = struct { | |
fn call(allocator: *std.mem.Allocator, _n: usize) anyerror!void { | |
var n = _n; | |
if (n <= 1) | |
return; | |
suspend { | |
var task = Loop.Task.init(@frame()); | |
Loop.schedule(&task, .High); | |
} | |
var tasks = std.ArrayList(*@Frame(@This().call)).init(allocator); | |
defer tasks.deinit(); | |
var err: ?anyerror = null; | |
while (n > 1) { | |
const f = allocator.create(@Frame(@This().call)) catch |e| { | |
err = e; | |
break; | |
}; | |
tasks.append(f) catch |e| { | |
err = e; | |
break; | |
}; | |
f.* = async @This().call(allocator, n - 2); | |
n = n - 1; | |
} | |
for (tasks.items) |frame| { | |
(await frame) catch |e| { | |
if (err == null) | |
err = e; | |
}; | |
} | |
return err orelse {}; | |
} | |
}; | |
const FibNaive = struct { | |
fn call(allocator: *std.mem.Allocator, n: usize) anyerror!usize { | |
if (n <= 1) | |
return n; | |
suspend { | |
var task = Loop.Task.init(@frame()); | |
Loop.schedule(&task, .High); | |
} | |
const l = try allocator.create(@Frame(@This().call)); | |
defer allocator.destroy(l); | |
const r = try allocator.create(@Frame(@This().call)); | |
defer allocator.destroy(r); | |
l.* = async @This().call(allocator, n - 1); | |
r.* = async @This().call(allocator, n - 2); | |
const lv = await l; | |
const rv = await r; | |
const lc = try lv; | |
const rc = try rv; | |
return (lc + rc); | |
} | |
}; | |
const Fib = FibRecurse; | |
var gpa = std.heap.GeneralPurposeAllocator(.{}){}; | |
var win_heap = if (std.builtin.os.tag == .windows) std.heap.HeapAllocator.init() else {}; | |
const allocator = if (std.builtin.link_libc) | |
std.heap.c_allocator | |
else if (std.builtin.os.tag == .windows) | |
&win_heap.allocator | |
else | |
&gpa.allocator; | |
_ = try (try Loop.run(.{}, Fib.call, .{allocator, 30})); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment