Skip to content

Instantly share code, notes, and snippets.

@kprotty
Last active March 20, 2024 16:11
Show Gist options
  • Save kprotty/77b248a12f5416d131ac375d59b9fa40 to your computer and use it in GitHub Desktop.
Save kprotty/77b248a12f5416d131ac375d59b9fa40 to your computer and use it in GitHub Desktop.
One-file event loop
const std = @import("std");
const Allocator = std.mem.Allocator;
pub const Loop = struct {
io_poller: IoPoller,
allocator: *Allocator,
is_running: bool = true,
is_notifying: bool = false,
run_queue: RunQueue = .{},
spawned_threads: usize = 0,
threads: []*Platform.Thread = &[_]*Platform.Thread{},
const Self = @This();
const IoPoller = Io.Backend.Poller;
var global_instance: Self = undefined;
pub const instance: ?*Self = switch (true) {
true => &global_instance,
else => null,
};
pub fn init(self: *Self, allocator: *Allocator) !void {
self.* = .{
.io_poller = undefined,
.allocator = allocator,
};
try self.io_poller.init();
errdefer self.io_poller.deinit();
var extra_threads: usize = 0;
if (!std.builtin.single_threaded)
extra_threads = (Platform.Thread.cpuCount() catch 1) - 1;
self.threads = try self.allocator.alloc(*Platform.Thread, extra_threads);
errdefer self.deinit();
for (self.threads) |*thread| {
thread.* = try Platform.Thread.spawn(Self.run, self);
self.spawned_threads += 1;
}
}
pub fn deinit(self: *Self) void {
self.shutdown();
for (self.threads[0..self.spawned_threads]) |thread|
thread.wait();
self.allocator.free(self.threads);
self.io_poller.deinit();
self.* = undefined;
}
pub fn shutdown(self: *Self) void {
@atomicStore(bool, &self.is_running, false, .SeqCst);
self.notify();
}
pub const Runnable = struct {
next: ?*Runnable = undefined,
frame: usize, // stored as usize instead of anyframe to support being printed
pub fn init(frame: anyframe) Runnable {
return .{ .frame = @ptrToInt(frame) };
}
pub fn run(self: *Runnable) void {
resume @intToPtr(anyframe, self.frame);
}
};
pub const Batch = struct {
head: ?*Runnable = null,
tail: *Runnable = undefined,
pub fn from(runnable: *Runnable) Batch {
runnable.next = null;
return Batch{
.head = runnable,
.tail = runnable,
};
}
pub fn isEmpty(self: Batch) bool {
return self.head == null;
}
pub fn push(self: *Batch, batch: Batch) void {
if (self.isEmpty()) {
self.* = batch;
} else if (!batch.isEmpty()) {
self.tail.next = batch.head;
self.tail = batch.tail;
}
}
pub fn pop(self: *Batch) ?*Runnable {
const runnable = self.head orelse return null;
self.head = runnable.next;
return runnable;
}
pub fn iter(self: Batch) Iter {
return .{ .runnable = self.head };
}
pub const Iter = struct {
runnable: ?*Runnable = null,
pub fn next(self: *Iter) ?*Runnable {
const runnable = self.runnable orelse return null;
self.runnable = runnable.next;
return runnable;
}
};
};
const RunQueue = struct {
lock: Platform.Lock = .{},
batch: Batch = .{},
has_pending: bool = false,
fn push(self: *RunQueue, batch: Batch) void {
if (batch.isEmpty())
return;
const held = self.lock.acquire();
defer held.release();
self.batch.push(batch);
@atomicStore(bool, &self.has_pending, true, .SeqCst);
}
fn pop(self: *RunQueue) ?*Runnable {
if (@atomicLoad(bool, &self.has_pending, .SeqCst) == false)
return null;
const held = self.lock.acquire();
defer held.release();
return self.batch.pop() orelse {
@atomicStore(bool, &self.has_pending, false, .SeqCst);
return null;
};
}
};
pub fn schedule(self: *Self, batch: Batch) void {
self.run_queue.push(batch);
self.notify();
}
pub fn yield(self: *Self) void {
suspend {
var runnable = Runnable.init(@frame());
self.schedule(Batch.from(&runnable));
}
}
pub fn run(self: *Self) void {
var was_waiting = false;
while (true) {
if (self.run_queue.pop()) |runnable| {
if (was_waiting)
self.notify();
was_waiting = false;
runnable.run();
continue;
}
if (@atomicLoad(bool, &self.is_running, .SeqCst)) {
self.wait();
was_waiting = true;
continue;
}
self.notify();
return;
}
}
fn notify(self: *Self) void {
if (@atomicLoad(bool, &self.is_notifying, .SeqCst))
return;
if (@atomicRmw(bool, &self.is_notifying, .Xchg, true, .SeqCst))
return;
self.io_poller.notify();
}
fn wait(self: *Self) void {
var events: IoPoller.Events = undefined;
self.io_poller.poll(&events, null);
var batch = Batch{};
defer self.run_queue.push(batch);
while (true) {
const maybe_runnable = events.next() catch {
@atomicStore(bool, &self.is_notifying, false, .SeqCst);
continue;
};
const runnable = maybe_runnable orelse break;
batch.push(Batch.from(runnable));
}
}
};
pub const sync = struct {
pub const Thread = struct {
fn ReturnTypeOf(comptime asyncFn: anytype) type {
return @typeInfo(@TypeOf(asyncFn)).Fn.return_type.?;
}
pub fn Handle(comptime T: type) type {
return struct {
frame: anyframe->T,
freeFn: fn(*@This()) void,
// TODO: must use (await (async self.join())) unless segfault
pub fn join(self: *@This()) T {
const result = await self.frame;
(self.freeFn)(self);
return result;
}
};
}
pub fn spawn(comptime asyncFn: anytype, args: anytype) !*Handle(ReturnTypeOf(asyncFn)) {
const loop = Loop.instance orelse {
@compileError("Evented IO is not enabled. Use `pub const io_mode = .evented` in root file");
};
const Args = @TypeOf(args);
const AsyncHandle = struct {
frame: @Frame(@This().run),
handle: Handle(ReturnTypeOf(asyncFn)),
fn free(handle: *Handle(ReturnTypeOf(asyncFn))) void {
const self = @fieldParentPtr(@This(), "handle", handle);
loop.allocator.destroy(self);
}
fn run(fn_args: Args) ReturnTypeOf(asyncFn) {
loop.yield();
return @call(.{}, asyncFn, fn_args);
}
};
const async_handle = try loop.allocator.create(AsyncHandle);
(&async_handle.frame).* = async AsyncHandle.run(args);
async_handle.handle.freeFn = AsyncHandle.free;
async_handle.handle.frame = &async_handle.frame;
return &async_handle.handle;
}
};
pub const Condvar = struct {
state: usize = 0,
semaphore: Semaphore = Semaphore.init(0),
const NOTIFIED: usize = 1 << 0;
const WAITING: usize = 1 << 1;
pub fn wait(self: *Condvar, held: Mutex.Held) void {
held.release();
defer _ = held.mutex.acquire();
// NOTIFIED -> 0: consumed notification
// x -> x + WAITING: wait on semaphore
var state = @atomicLoad(usize, &self.state, .SeqCst);
while (true) {
state = @cmpxchgWeak(
usize,
&self.state,
state,
if (state == NOTIFIED) 0 else state + WAITING,
.SeqCst,
.SeqCst,
) orelse break;
}
if (state & NOTIFIED == 0)
self.semaphore.wait();
}
pub fn signal(self: *Condvar) void {
// NOTIFIED: nothing left to do
// 0 -> NOTIFIED: left a notification
// x -> x - WAITING: post to semaphore
var state = @atomicLoad(usize, &self.state, .SeqCst);
while (true) {
if (state == NOTIFIED)
return;
state = @cmpxchgWeak(
usize,
&self.state,
state,
if (state == 0) NOTIFIED else state - WAITING,
.SeqCst,
.SeqCst,
) orelse break;
}
if (state != 0)
self.semaphore.post();
}
pub fn broadcast(self: *Condvar) void {
// NOTIFIED: nothing left to do
// 0 -> NOTIFIED: left a notification
// x -> 0: post to semaphore for every WAITER in x
var state = @atomicLoad(usize, &self.state, .SeqCst);
while (true) {
if (state == NOTIFIED)
return;
state = @cmpxchgWeak(
usize,
&self.state,
state,
if (state == 0) NOTIFIED else 0,
.SeqCst,
.SeqCst,
) orelse break;
}
var waiters = state >> 1;
while (waiters > 0) : (waiters -= 1)
self.semaphore.post();
}
};
pub const Mutex = struct {
semaphore: Semaphore = Semaphore.init(1),
pub fn tryAcquire(self: *Mutex) ?Held {
if (self.semaphore.tryWait())
return Held{ .mutex = self };
return null;
}
pub fn acquire(self: *Mutex) Held {
self.semaphore.wait();
return Held{ .mutex = self };
}
pub const Held = struct {
mutex: *Mutex,
pub fn release(self: Held) void {
self.mutex.semaphore.post();
}
};
};
pub const Semaphore = struct {
channel: Channel(void),
pub fn init(permits: usize) Semaphore {
return Semaphore{
.channel = Channel(void){
.head = 0,
.tail = permits,
.buffer = {},
.capacity = std.math.maxInt(usize),
},
};
}
pub fn tryWait(self: *Semaphore) bool {
return self.channel.tryGet() != null;
}
pub fn wait(self: *Semaphore) void {
_ = self.channel.get();
}
pub fn post(self: *Semaphore) void {
std.debug.assert(self.channel.tryPut(undefined));
}
};
pub fn Channel(comptime T: type) type {
return struct {
lock: Platform.Lock = .{},
head: usize = 0,
tail: usize = 0,
putters: Waiters = .{},
getters: Waiters = .{},
buffer: if (@sizeOf(T) > 0) [*]T else void,
capacity: usize,
const Self = @This();
const Mode = enum { Blocking, NonBlocking };
const Waiters = std.SinglyLinkedList(struct {
item: T,
runnable: Loop.Runnable,
});
pub fn tryPut(self: *Self, item: T) bool {
return self.putUsing(item, .NonBlocking) != null;
}
pub fn put(self: *Self, item: T) void {
std.debug.assert(self.putUsing(item, .Blocking) != null);
}
fn putUsing(self: *Self, item: T, comptime mode: Mode) ?void {
const held = self.lock.acquire();
if (self.notify(&self.getters, held, item)) |_| {
return;
}
if (self.tail -% self.head < self.capacity) {
if (@sizeOf(T) > 0)
self.buffer[self.tail % self.capacity] = item;
self.tail +%= 1;
held.release();
return;
}
if (mode == .NonBlocking) {
held.release();
return null;
}
return self.wait(&self.putters, held, item);
}
pub fn tryGet(self: *Self) ?T {
return self.getUsing(.NonBlocking);
}
pub fn get(self: *Self) T {
return self.getUsing(.Blocking) orelse unreachable;
}
fn getUsing(self: *Self, comptime mode: Mode) ?T {
const held = self.lock.acquire();
if (self.notify(&self.putters, held, null)) |item|
return item;
if (self.tail != self.head) {
var item: T = undefined;
if (@sizeOf(T) > 0)
item = self.buffer[self.head % self.capacity];
self.head +%= 1;
held.release();
return item;
}
if (mode == .NonBlocking) {
held.release();
return null;
}
return self.wait(&self.getters, held, null);
}
fn wait(self: *Self, waiters: *Waiters, held: Platform.Lock.Held, write_item: ?T) T {
var waiter: Waiters.Node = undefined;
if (write_item) |write|
waiter.data.item = write;
waiter.data.runnable = Loop.Runnable.init(@frame());
waiters.prepend(&waiter);
suspend {
held.release();
}
return waiter.data.item;
}
fn notify(self: *Self, waiters: *Waiters, held: Platform.Lock.Held, write_item: ?T) ?T {
const waiter = waiters.popFirst() orelse return null;
const item = waiter.data.item;
if (write_item) |write|
waiter.data.item = write;
const loop = Loop.instance orelse {
@compileError("Evented IO is not enabled. Use `pub const io_mode = .evented` in root file");
};
loop.schedule(Loop.Batch.from(&waiter.data.runnable));
held.release();
return item;
}
};
}
};
pub const Io = struct {
pub const Backend = switch (std.builtin.os.tag) {
.linux => LinuxBackend,
.windows => @compileError("TODO"),
.macos, .freebsd, .openbsd, .netbsd, .dragonfly => @compileError("TODO"),
else => @compileError("Platform not supported for IO"),
};
const LinuxBackend = PosixBackend(struct {
epoll_fd: std.os.fd_t,
notify_fd: std.os.fd_t,
registered_notify_fd: bool,
const Self = @This();
pub fn init(self: *Self) !void {
self.epoll_fd = try std.os.epoll_create1(std.os.EPOLL_CLOEXEC);
errdefer std.os.close(self.epoll_fd);
self.notify_fd = try std.os.eventfd(0, std.os.EFD_CLOEXEC | std.os.EFD_NONBLOCK);
errdefer std.os.close(self.notify_fd);
self.registered_notify_fd = false;
}
pub fn deinit(self: *Self) void {
std.os.close(self.notify_fd);
std.os.close(self.epoll_fd);
self.* = undefined;
}
pub fn notify(self: *Self) void {
var epoll_event = std.os.epoll_event{
.events = std.os.EPOLLOUT | std.os.EPOLLONESHOT,
.data = .{ .ptr = 0 },
};
var epoll_ctl_op: u32 = std.os.EPOLL_CTL_ADD;
if (self.registered_notify_fd) {
epoll_ctl_op = std.os.EPOLL_CTL_MOD;
} else {
self.registered_notify_fd = true;
}
std.os.epoll_ctl(
self.epoll_fd,
epoll_ctl_op,
self.notify_fd,
&epoll_event,
) catch unreachable;
}
pub fn poll(self: *Self, events: []Event, timeout: ?u64) usize {
var timeout_ms: i32 = -1;
if (timeout) |timeout_ns| {
timeout_ms = std.math.cast(i32, timeout_ns / std.time.ns_per_ms) catch -1;
}
return std.os.epoll_wait(
self.epoll_fd,
@ptrCast([*]std.os.epoll_event, events.ptr)[0..events.len],
timeout_ms,
);
}
pub const Event = extern struct {
epoll_event: std.os.epoll_event,
pub fn getData(self: Event) usize {
return self.epoll_event.data.ptr;
}
pub fn isNotified(self: Event) bool {
return self.getData() == 0;
}
pub fn isReadable(self: Event) bool {
const flags = std.os.EPOLLERR | std.os.EPOLLHUP | std.os.EPOLLIN | std.os.EPOLLRDHUP;
return self.epoll_event.events & flags != 0;
}
pub fn isWritable(self: Event) bool {
const flags = std.os.EPOLLERR | std.os.EPOLLHUP | std.os.EPOLLIN | std.os.EPOLLRDHUP;
return self.epoll_event.events & flags != 0;
}
};
});
fn PosixBackend(comptime IoPoller: type) type {
return struct {
pub const Poller = struct {
io_poller: IoPoller,
const Self = @This();
pub fn init(self: *Self) !void {
try self.io_poller.init();
}
pub fn deinit(self: *Self) void {
self.io_poller.deinit();
}
pub fn notify(self: *Self) void {
self.io_poller.notify();
}
pub fn poll(self: *Self, events: *Events, timeout: ?u64) void {
events.* = Events{};
events.found = self.io_poller.poll(&events.events, timeout);
return;
}
pub const Events = struct {
events: [64]IoPoller.Event = undefined,
readers: ?*Loop.Runnable = null,
writers: ?*Loop.Runnable = null,
index: usize = 0,
found: usize = 0,
fn pop(stack: *?*Loop.Runnable) ?*Loop.Runnable {
const runnable = stack.* orelse return null;
stack.* = runnable.next;
return runnable;
}
pub fn next(self: *Events) error{Notified}!?*Loop.Runnable {
if (pop(&self.writers) orelse pop(&self.readers)) |runnable|
return runnable;
while (self.index < self.found) {
const event = self.events[self.index];
self.index += 1;
if (event.isNotified())
return error.Notified;
const async_fd = @intToPtr(*AsyncFd, event.getData());
if (event.isWritable())
self.writers = async_fd.wake(.Write);
if (event.isReadable())
self.readers = async_fd.wake(.Read);
if (pop(&self.writers) orelse pop(&self.readers)) |runnable|
return runnable;
}
return null;
}
};
};
const AsyncFd = struct {
fd: std.os.fd_t,
next: ?*Self = null,
reader: ?*Loop.Runnable = null,
writer: ?*Loop.Runnable = null,
const Self = @This();
const Event = enum { Read, Write };
const NOTIFIED = @intToPtr(*Loop.Runnable, @alignOf(Loop.Runnable));
var cache_lock = Lock{};
var cache_top: ?*Self = null;
var cache_gpa = std.heap.GeneralPurposeAllocator(.{}){};
fn alloc() !*Self {
const held = cache_lock.acquire();
defer held.release();
if (cache_top) |self| {
cache_top = self.next;
return self;
} else {
return cache_gpa.allocator.create(Self);
}
}
fn free(self: *Self) void {
const held = cache_lock.acquire();
defer held.release();
self.next = cache_top;
cache_top = self;
}
fn wait(self: *Self, event: Event) void {
const queue_ptr = switch (event) {
.Read => &self.reader,
.Write => &self.writer,
};
var runnable = Loop.Runnable.init(@frame());
suspend {
// NOTIFIED -> null: consume notification & tail resume
// x -> our runnable: enqueue ourselves to stack and suspend
var queue = @atomicLoad(?*Loop.Runnable, queue_ptr, .SeqCst);
while (true) {
runnable.next = queue;
queue = @cmpxchgWeak(
?*Loop.Runnable,
queue_ptr,
queue,
if (queue == NOTIFIED) null else &runnable,
.SeqCst,
.SeqCst,
) orelse {
if (queue == NOTIFIED)
Loop.getInstance().?.schedule(&runnable);
break;
};
}
}
}
fn wake(self: *Self, event: Event) ?*Loop.Runnable {
const queue_ptr = switch (event) {
.Read => &self.reader,
.Write => &self.writer,
};
var queue = @atomicLoad(?*Loop.Runnable, queue_ptr, .SeqCst);
while (true) {
if (queue == NOTIFIED)
return null;
queue = @cmpxchgWeak(
?*Loop.Runnable,
queue_ptr,
queue,
if (queue == null) NOTIFIED else null,
.SeqCst,
.SeqCst,
) orelse {
return queue;
};
}
}
};
};
}
};
const Platform = struct {
pub const Thread = std.Thread;
pub usingnamespace if (std.builtin.os.tag == .windows)
WindowsPlatform
else if (std.Target.current.isDarwin())
DarwinPlatform
else if (std.builtin.link_libc)
PosixPlatform
else if (std.builtin.os.tag == .linux)
LinuxPlatform
else
@compileError("Platform not supported");
const WindowsPlatform = @compileError("TODO");
const DarwinPlatform = @compileError("TODO");
const PosixPlatform = struct {
pub const Lock = struct {
mutex: std.c.pthread_mutex_t = std.c.PTHREAD_MUTEX_INITIALIZER,
const Self = @This();
pub fn acquire(self: *Self) Held {
std.debug.assert(std.c.pthread_mutex_lock(&self.mutex) == 0);
return Held{ .lock = self };
}
pub const Held = struct {
lock: *Self,
pub fn release(self: Held) void {
std.debug.assert(std.c.pthread_mutex_unlock(&self.lock.mutex) == 0);
}
};
};
};
const LinuxPlatform = struct {
pub const Lock = struct {
state: State = .Unlocked,
const Self = @This();
const State = enum(i32) {
Unlocked = 0,
Locked,
Contended,
};
pub fn deinit(self: *Self) void {
self.* = undefined;
}
pub fn acquire(self: *Self) Held {
if (@cmpxchgWeak(
State,
&self.state,
.Unlocked,
.Locked,
.Acquire,
.Monotonic,
)) |_| {
self.acquireSlow();
}
return Held{ .lock = self };
}
fn acquireSlow(self: *Self) void {
@setCold(true);
var spin: usize = 0;
var lock_state = State.Locked;
var state = @atomicLoad(State, &self.state, .Monotonic);
while (true) {
if (state == .Unlocked) {
state = @cmpxchgWeak(
State,
&self.state,
state,
lock_state,
.Acquire,
.Monotonic,
) orelse return;
std.Thread.spinLoopHint();
continue;
}
const max_spin = switch (std.builtin.arch) {
.i386, .x86_64 => 10,
.aarch64 => 5,
else => 0,
};
if (state == .Locked and spin < max_spin) {
spin += 1;
std.Thread.spinLoopHint();
state = @atomicLoad(State, &self.state, .Monotonic);
continue;
}
if (state != .Contended) {
if (@cmpxchgWeak(
State,
&self.state,
state,
.Contended,
.Monotonic,
.Monotonic,
)) |updated| {
std.Thread.spinLoopHint();
state = updated;
continue;
}
}
switch (std.os.linux.getErrno(std.os.linux.futex_wait(
@ptrCast(*const i32, &self.state),
std.os.linux.FUTEX_PRIVATE_FLAG | std.os.linux.FUTEX_WAIT,
@enumToInt(State.Contended),
null,
))) {
0 => {},
std.os.EINTR => {},
std.os.EAGAIN => {},
std.os.ETIMEDOUT => unreachable,
else => unreachable,
}
spin = 0;
lock_state = .Contended;
state = @atomicLoad(State, &self.state, .Monotonic);
}
}
pub const Held = struct {
lock: *Self,
pub fn release(self: Held) void {
switch (@atomicRmw(State, &self.lock.state, .Xchg, .Unlocked, .Release)) {
.Unlocked => unreachable,
.Locked => {},
.Contended => self.releaseSlow(),
}
}
fn releaseSlow(self: Held) void {
@setCold(true);
switch (std.os.linux.getErrno(std.os.linux.futex_wake(
@ptrCast(*const i32, &self.lock.state),
std.os.linux.FUTEX_PRIVATE_FLAG | std.os.linux.FUTEX_WAKE,
1,
))) {
0 => {},
std.os.EINVAL => {},
std.os.EACCES => {},
std.os.EFAULT => {},
else => unreachable,
}
}
};
};
};
};
const std = @import("std");
pub const Loop = struct {
mutex: std.Thread.Mutex,
event_poller: EventPoller,
state: State,
spawned: usize,
threads: []*std.Thread,
allocator: *std.mem.Allocator,
run_queue: std.SinglyLinkedList(anyframe),
const State = enum(u8) {
Running,
Waking,
Shutdown,
};
pub fn init(self: *Loop, allocator: *std.mem.Allocator) !void {
self.* = .{
.mutex = .{},
.event_poller = try EventPoller.init(),
.state = .Running,
.spawned = 0,
.threads = &[_]*std.Thread{},
.allocator = allocator,
.run_queue = .{},
};
errdefer self.deinit();
var num_threads: usize = 0;
if (!std.builtin.single_threaded)
num_threads = std.math.max(1, std.Thread.cpuCount() catch 1);
self.threads = try allocator.alloc(*std.Thread, num_threads);
for (self.threads) |*thread| {
thread.* = try std.Thread.spawn(Loop.run, self);
self.spawned += 1;
}
}
pub fn deinit(self: *Loop) void {
self.shutdown();
for (self.threads) |thread| {
thread.wait();
}
self.allocator.free(self.threads);
self.run_queue.deinit();
self.event_poller.deinit();
self.* = undefined;
}
pub fn shutdown(self: *Loop) void {
const state = @atomicRmw(State, &self.state, .Xchg, .Shutdown, .SeqCst);
if (state != .Waking) {
self.event_poller.notify();
}
}
pub fn run(self: *Loop) void {
var is_running = true;
while (true) {
if (self.pop()) |task| {
if (!is_running) self.notify();
is_running = true;
resume task.data;
continue;
}
const state = @atomicLoad(State, &self.state, .SeqCst);
if (state != .Shutdown) {
is_running = false;
self.wait();
continue;
}
self.notify();
return;
}
}
pub const Task = std.SinglyLinkedList(anyframe).Node;
pub fn push(self: *Loop, task: *Task) void {
{
const held = self.mutex.acquire();
defer held.release();
self.run_queue.prepend(task);
}
self.notify();
}
fn pop(self: *Loop) ?*Task {
const held = self.mutex.acquire();
defer held.release();
return self.run_queue.popFirst();
}
fn notify(self: *Loop) void {
if (@cmpxchgStrong(State, &self.state, .Running, .Waking, .SeqCst, .SeqCst) == null) {
self.event_poller.notify();
}
}
fn wait(self: *Loop) void {
var events = self.event_poller.poll();
while (events.next()) |task_ptr| {
if (@intToPtr(?*Task, task_ptr)) |task| {
self.push(task);
continue;
}
_ = @cmpxchgStrong(
State,
&self.state,
.Waking,
.Running,
.SeqCst,
.SeqCst,
);
}
}
const EventPoller = switch (std.builtin.os.tag) {
.linux => LinuxPoller,
.windows => WindowsPoller,
.macos, .openbsd, .freebsd, .netbsd, .dragonfly => BsdPoller,
else => @compileError("TODO: use select()"),
};
const WindowsPoller = @compileError("TODO: IOCP + AFD");
const LinuxPoller = struct {
epoll_fd: std.os.fd_t,
notify_fd: std.os.fd_t,
did_register_notify_fd: bool,
pub fn init() !LinuxPoller {
const epoll_fd = try std.os.epoll_create1(std.os.EPOLL_CLOEXEC);
errdefer std.os.close(epoll_fd);
const notify_fd = try std.os.eventfd(0, std.os.EFD_CLOEXEC | std.os.EFD_NONBLOCK);
errdefer std.os.close(notify_fd);
return .{
.epoll_fd = epoll_fd,
.notify_fd = notify_fd,
.did_register_notify_fd = false,
};
}
pub fn deinit(self: *LinuxPoller) void {
std.os.close(self.notify_fd);
std.os.close(self.epoll_fd);
self.* = undefined;
}
pub fn notify(self: *LinuxPoller) void {
self.arm(self.notify_fd, 0, std.os.EPOLLOUT | std.os.EPOLLONESHOT) catch unreachable;
}
pub fn wait(self: *LinuxPoller, fd: std.os.fd_t, event: enum{ read, write }) !void {
var task = Task{ .data = @frame() };
var err: ?std.os.EpollCtlError = null;
suspend {
self.arm(fd, @ptrToInt(&task), switch (event) {
.read => std.os.EPOLLIN | std.os.EPOLLONESHOT | std.os.EPOLLRDHUP,
.write => std.os.EPOLLOUT | std.os.EPOLLONESHOT,
}) catch |e| {
err = e;
Loop.instance.?.push(&task);
};
}
if (err) |wait_err| {
return wait_err;
}
}
fn arm(self: *LinuxPoller, fd: std.os.fd_t, data: usize, flags: u32) !void {
var op: u32 = std.os.EPOLL_CTL_MOD;
var event = std.os.epoll_event{
.events = flags,
.data = .{ .ptr = data },
};
std.os.epoll_ctl(self.epoll_fd, std.os.EPOLL_CTL_MOD, fd, &event) catch |err| switch (err) {
error.FileDescriptorNotRegistered => {
try std.os.epoll_ctl(self.epoll_fd, std.os.EPOLL_CTL_ADD, fd, &event);
},
else => return err,
};
}
pub fn poll(self: *LinuxPoller) Poll {
var poll: Poll = undefined;
poll.active = std.os.epoll_wait(self.epoll_fd, &poll.events, -1);
poll.index = 0;
return poll;
}
pub const Poll = struct {
events: [64]std.os.epoll_event,
active: usize,
index: usize,
pub fn next(self: *Poll) ?usize {
if (self.index >= self.active) {
return null;
}
const event = self.events[self.index];
self.index += 1;
return event.data.ptr;
}
};
};
const BsdPoller = struct {
kqueue_fd: std.os.fd_t,
pub fn init() !BsdPoller {
return BsdPoller{ .kqueue_fd = try std.os.kqueue() };
}
pub fn deinit(self: *BsdPoller) void {
std.os.close(self.kqueue_fd);
self.* = undefined;
}
pub fn notify(self: *BsdPoller) void {
self.arm(self.kqueue_fd, 0, std.os.EVFILT_USER) catch unreachable;
}
pub fn wait(self: *BsdPoller, fd: std.os.fd_t, event: enum{ read, write }) !void {
var task = Task{ .data = @frame() };
var err: ?std.os.KEventError = null;
suspend {
self.arm(fd, @ptrToInt(&task), switch (event) {
.read => std.os.EVFILT_READ,
.write => std.os.EVFILT_WRITE,
}) catch |e| {
err = e;
Loop.instance.?.push(&task);
};
}
if (err) |wait_err| {
return wait_err;
}
}
fn arm(self: *BsdPoller, fd: std.os.fd_t, data: usize, filter: u32) !void {
var events: [1]std.os.Kevent = undefined;
events[0] = .{
.ident = fd,
.filter = filter,
.flags = std.os.EV_ADD | std.os.EV_ENABLE | std.os.EV_ONESHOT,
.fflags = 0,
.data = 0,
.udata = data,
};
_ = try std.os.kevent(
self.kqueue_fd,
&events,
&[0]os.Kevent{},
null,
);
}
pub fn poll(self: *BsdPoller) Poll {
var poll: Poll = undefined;
poll.index = 0;
poll.active = std.os.kevent(
self.kqueue_fd,
&[0]os.Kevent{},
&poll.events,
null,
) catch 0;
return poll;
}
pub const Poll = struct {
events: [64]std.os.Kevent,
active: usize,
index: usize,
pub fn next(self: *Poll) ?usize {
if (self.index >= self.active) {
return null;
}
const event = self.events[self.index];
self.index += 1;
return event.udata;
}
};
};
};
const std = @import("std");
pub const Loop = struct {
mutex: std.Thread.Mutex,
is_running: bool,
is_notified: bool,
event_poller: EventPoller,
spawned: usize,
threads: []*std.Thread,
allocator: *std.mem.Allocator,
run_queue: std.SinglyLinkedList(anyframe),
var global_instance: Loop = undefined;
pub const instance: ?*Loop = &global_instance;
pub fn init(self: *Loop, allocator: *std.mem.Allocator) !void {
self.* = .{
.mutex = .{},
.is_running = true,
.is_notified = false,
.event_poller = try EventPoller.init(),
.spawned = 0,
.threads = &[_]*std.Thread{},
.allocator = allocator,
.run_queue = .{},
};
errdefer self.deinit();
var num_threads: usize = 0;
if (!std.builtin.single_threaded)
num_threads = std.math.max(1, std.Thread.cpuCount() catch 1);
self.threads = try allocator.alloc(*std.Thread, num_threads);
for (self.threads) |*thread| {
thread.* = try std.Thread.spawn(Loop.run, self);
self.spawned += 1;
}
}
pub fn deinit(self: *Loop) void {
self.shutdown();
for (self.threads) |thread| {
thread.wait();
}
self.allocator.free(self.threads);
self.event_poller.deinit();
self.* = undefined;
}
pub fn shutdown(self: *Loop) void {
@atomicStore(bool, &self.is_running, false, .SeqCst);
self.notify();
}
pub fn run(self: *Loop) void {
var was_waiting = true;
while (true) {
if (self.pop()) |task| {
if (was_waiting) self.notify();
was_waiting = false;
resume task.data;
continue;
}
if (@atomicLoad(bool, &self.is_running, .SeqCst)) {
self.wait();
was_waiting = true;
continue;
}
self.notify();
return;
}
}
pub const Task = std.SinglyLinkedList(anyframe).Node;
pub fn push(self: *Loop, task: *Task) void {
{
const held = self.mutex.acquire();
defer held.release();
self.run_queue.prepend(task);
}
self.notify();
}
fn pop(self: *Loop) ?*Task {
const held = self.mutex.acquire();
defer held.release();
return self.run_queue.popFirst();
}
fn notify(self: *Loop) void {
if (@cmpxchgStrong(bool, &self.is_notified, false, true, .SeqCst, .SeqCst) == null) {
self.event_poller.notify();
}
}
fn wait(self: *Loop) void {
var events = self.event_poller.poll();
while (events.next()) |task_ptr| {
if (@intToPtr(?*Task, task_ptr)) |task| {
self.push(task);
} else {
@atomicStore(bool, &self.is_notified, false, .SeqCst);
}
}
}
const EventPoller = switch (std.builtin.os.tag) {
.linux => LinuxPoller,
.windows => WindowsPoller,
.macos, .openbsd, .freebsd, .netbsd, .dragonfly => BsdPoller,
else => @compileError("TODO: use select()"),
};
const WindowsPoller = @compileError("TODO: IOCP + AFD");
const LinuxPoller = struct {
epoll_fd: std.os.fd_t,
notify_fd: std.os.fd_t,
did_register_notify_fd: bool,
pub fn init() !LinuxPoller {
const epoll_fd = try std.os.epoll_create1(std.os.EPOLL_CLOEXEC);
errdefer std.os.close(epoll_fd);
const notify_fd = try std.os.eventfd(0, std.os.EFD_CLOEXEC | std.os.EFD_NONBLOCK);
errdefer std.os.close(notify_fd);
return LinuxPoller{
.epoll_fd = epoll_fd,
.notify_fd = notify_fd,
.did_register_notify_fd = false,
};
}
pub fn deinit(self: *LinuxPoller) void {
std.os.close(self.notify_fd);
std.os.close(self.epoll_fd);
self.* = undefined;
}
pub fn notify(self: *LinuxPoller) void {
self.arm(self.notify_fd, 0, std.os.EPOLLOUT | std.os.EPOLLONESHOT) catch unreachable;
}
pub fn wait(self: *LinuxPoller, fd: std.os.fd_t, event: enum{ read, write }) !void {
var task = Task{ .data = @frame() };
var err: ?std.os.EpollCtlError = null;
suspend {
self.arm(fd, @ptrToInt(&task), switch (event) {
.read => std.os.EPOLLIN | std.os.EPOLLONESHOT | std.os.EPOLLRDHUP,
.write => std.os.EPOLLOUT | std.os.EPOLLONESHOT,
}) catch |e| {
err = e;
Loop.instance.?.push(&task);
};
}
if (err) |wait_err| {
return wait_err;
}
}
fn arm(self: *LinuxPoller, fd: std.os.fd_t, data: usize, flags: u32) !void {
var op: u32 = std.os.EPOLL_CTL_MOD;
var event = std.os.epoll_event{
.events = flags,
.data = .{ .ptr = data },
};
std.os.epoll_ctl(self.epoll_fd, std.os.EPOLL_CTL_MOD, fd, &event) catch |err| switch (err) {
error.FileDescriptorNotRegistered => {
try std.os.epoll_ctl(self.epoll_fd, std.os.EPOLL_CTL_ADD, fd, &event);
},
else => return err,
};
}
pub fn poll(self: *LinuxPoller) Polled {
var polled: Polled = undefined;
polled.active = std.os.epoll_wait(self.epoll_fd, &polled.events, -1);
polled.index = 0;
return polled;
}
pub const Polled = struct {
events: [64]std.os.epoll_event,
active: usize,
index: usize,
pub fn next(self: *Polled) ?usize {
if (self.index >= self.active) {
return null;
}
const event = self.events[self.index];
self.index += 1;
return event.data.ptr;
}
};
};
const BsdPoller = struct {
kqueue_fd: std.os.fd_t,
pub fn init() !BsdPoller {
return BsdPoller{ .kqueue_fd = try std.os.kqueue() };
}
pub fn deinit(self: *BsdPoller) void {
std.os.close(self.kqueue_fd);
self.* = undefined;
}
pub fn notify(self: *BsdPoller) void {
self.arm(self.kqueue_fd, 0, std.os.EVFILT_USER) catch unreachable;
}
pub fn wait(self: *BsdPoller, fd: std.os.fd_t, event: enum{ read, write }) !void {
var task = Task{ .data = @frame() };
var err: ?std.os.KEventError = null;
suspend {
self.arm(fd, @ptrToInt(&task), switch (event) {
.read => std.os.EVFILT_READ,
.write => std.os.EVFILT_WRITE,
}) catch |e| {
err = e;
Loop.instance.?.push(&task);
};
}
if (err) |wait_err| {
return wait_err;
}
}
fn arm(self: *BsdPoller, fd: std.os.fd_t, data: usize, filter: u32) !void {
var events: [1]std.os.Kevent = undefined;
events[0] = .{
.ident = fd,
.filter = filter,
.flags = std.os.EV_ADD | std.os.EV_ENABLE | std.os.EV_ONESHOT,
.fflags = 0,
.data = 0,
.udata = data,
};
_ = try std.os.kevent(
self.kqueue_fd,
&events,
&[0]os.Kevent{},
null,
);
}
pub fn poll(self: *BsdPoller) Polled {
var polled: Polled = undefined;
polled.index = 0;
polled.active = std.os.kevent(
self.kqueue_fd,
&[0]os.Kevent{},
&polled.events,
null,
) catch 0;
return polled;
}
pub const Polled = struct {
events: [64]std.os.Kevent,
active: usize,
index: usize,
pub fn next(self: *Polled) ?usize {
if (self.index >= self.active) {
return null;
}
const event = self.events[self.index];
self.index += 1;
return event.udata;
}
};
};
};
const sync = struct {
pub const RwLock = struct {
state: usize = 0,
const READING = 1;
const WRITING = std.math.maxInt(usize);
pub fn tryAcquire(self: *RwLock) ?Held {
if (@cmpxchgStrong(usize, &self.state, 0, WRITING, .SeqCst, .SeqCst) == null) {
return Held{ .lock = self };
} else {
return null;
}
}
pub fn acquire(self: *RwLock) Held {
while (true) {
if (self.tryAcquire()) |held| {
return held;
}
var waiter = Futex.wait(@ptrToInt(self));
switch (@atomicLoad(usize, &self.state, .SeqCst)) {
0 => waiter.cancel(),
else => waiter.wait(),
}
}
}
pub fn tryAcquireShared(self: *RwLock) ?Held {
var state = @atomicLoad(usize, &self.state, .SeqCst);
while (true) {
if (state == WRITING) {
return null;
}
state = @cmpxchgWeak(
usize,
&self.state,
state,
state + READING,
.SeqCst,
.SeqCst,
) orelse return Held{
.is_shared = true,
.lock = self,
};
}
}
pub fn acquireShared(self: *RwLock) Held {
while (true) {
if (self.tryAcquireShared()) |held| {
return held;
}
var waiter = Futex.wait(@ptrToInt(self));
switch (@atomicLoad(usize, &self.state, .SeqCst)) {
WRITING => waiter.wait(),
else => waiter.cancel(),
}
}
}
pub const Held = struct {
is_shared: bool = false,
lock: *RwLock,
pub fn release(self: Held) void {
if (self.is_shared) {
self.lock.releaseShared();
} else {
self.lock.release();
}
}
};
pub fn release(self: *RwLock) void {
@atomicStore(usize, &self.state, 0, .SeqCst);
Futex.wake(@ptrToInt(self), 2);
}
pub fn releaseShared(self: *RwLock) void {
const state = @atomicRmw(usize, &self.state, .Sub, READING, .SeqCst);
if (state == READING) {
Futex.wake(@ptrToInt(self), 1);
}
}
};
pub const Condvar = struct {
was_notified: bool = false,
pub fn wait(self: *Condvar, held: Mutex.Held) void {
var waiter = Futex.wait(@ptrToInt(self));
if (@atomicLoad(bool, &self.was_notified, .SeqCst)) {
@atomicStore(bool, &self.was_notified, false, .SeqCst);
waiter.cancel();
return;
}
held.release();
waiter.wait();
_ = held.mutex.acquire();
}
pub fn signal(self: *Condvar) void {
@atomicStore(bool, &self.was_notified, true, .SeqCst);
Futex.wake(@ptrToInt(self), 1);
}
pub fn broadcast(self: *Condvar) void {
@atomicStore(bool, &self.was_notified, true, .SeqCst);
Futex.wake(@ptrToInt(self), std.math.maxInt(usize));
}
};
pub const Mutex = struct {
is_held: bool = false,
pub fn tryAcquire(self: *Mutex) ?Held {
if (@cmpxchgStrong(bool, &self.is_held, false, true, .SeqCst, .SeqCst) == null) {
return Held{ .mutex = self };
} else {
return null;
}
}
pub fn acquire(self: *Mutex) Held {
while (true) {
if (self.tryAcquire()) |held| {
return held;
}
var waiter = Futex.wait(@ptrToInt(self));
if (@atomicLoad(bool, &self.is_held, .SeqCst)) {
waiter.wait();
} else {
waiter.cancel();
}
}
}
pub const Held = struct {
mutex: *Mutex,
pub fn release(self: Held) void {
@atomicStore(bool, &self.mutex.is_held, false, .SeqCst);
Futex.wake(@ptrToInt(self.mutex), 1);
}
};
};
pub const WaitGroup = struct {
active: usize = 0,
pub fn init(active: usize) WaitGroup {
return WaitGroup{ .active = active };
}
pub fn done(self: *WaitGroup) void {
self.add(-1);
}
pub fn add(self: *WaitGroup, delta: isize) void {
var active: usize = undefined;
defer if (active == 0) {
Futex.wake(@ptrToInt(self), std.math.maxInt(usize));
};
if (delta < 0) {
active = @atomicRmw(usize, &self.active, .Sub, @intCast(usize, -delta), .SeqCst);
} else {
active = @atomicRmw(usize, &self.active, .Add, @intCast(usize, delta), .SeqCst);
}
if (delta < 0) {
active -= @intCast(usize, -delta);
} else {
active += @intCast(usize, delta);
}
}
pub fn wait(self: *WaitGroup) void {
while (true) {
if (@atomicLoad(usize, &self.active, .SeqCst) == 0) {
return;
}
var waiter = Futex.wait(@ptrToInt(self));
switch (@atomicLoad(usize, &self.active, .SeqCst)) {
0 => waiter.cancel(),
else => waiter.wait(),
}
}
}
};
pub const Semaphore = struct {
permits: usize = 0,
pub fn init(permits: usize) Semaphore {
return Semaphore{ .permits = permits };
}
pub fn post(self: *Semaphore) void {
_ = @atomicRmw(usize, &self.permits, .Add, 1, .SeqCst);
Futex.wake(@ptrToInt(self), 1);
}
pub fn tryWait(self: *Semaphore) bool {
var permits = @atomicLoad(usize, &self.permits, .SeqCst);
while (true) {
if (permits == 0) {
return false;
}
permits = @cmpxchgWeak(
usize,
&self.permits,
permits,
permits - 1,
.SeqCst,
.SeqCst,
) orelse return true;
}
}
pub fn wait(self: *Semaphore) void {
while (true) {
if (self.tryWait()) {
return;
}
var waiter = Futex.wait(@ptrToInt(self));
switch (@atomicLoad(usize, &self.permits, .SeqCst)) {
0 => waiter.wait(),
else => waiter.cancel(),
}
}
}
};
pub fn Channel(comptime T: type) type {
return struct {
mutex: Mutex = .{},
readers: Condvar = .{},
writers: Condvar = .{},
head: usize = 0,
size: usize = 0,
capacity: usize,
buffer: if (@sizeOf(T) > 0) [*]T else void,
const Self = @This();
pub fn init(items: []T) Self {
return Self{
.capacity = items.len,
.buffer = if (@sizeOf(T) > 0) items.ptr else {},
};
}
pub fn tryWriteItem(self: *Self, item: T) bool {
return self.tryWrite(&[_]T{ item }) == 1;
}
pub fn tryReadItem(self: *Self) ?T {
var items: [1]T = undefined;
return if (self.tryRead(&items) == 1) items[0] else null;
}
pub fn writeItem(self: *Self, item: T) void {
return self.write(&[_]T{ item });
}
pub fn readItem(self: *Self) T {
var items: [1]T = undefined;
self.read(&items);
return items[0];
}
pub fn tryWrite(self: *Self, items: []const T) usize {
return self.writeItems(items, false);
}
pub fn tryRead(self: *Self, items: []T) usize {
return self.readItems(items, false);
}
pub fn write(self: *Self, items: []const T) void {
std.debug.assert(self.writeItems(items, true) == items.len);
}
pub fn read(self: *Self, items: []T) void {
std.debug.assert(self.readItems(items, true) == items.len);
}
fn writeItems(self: *Self, items: []const T, comptime can_block: bool) usize {
const held = self.mutex.acquire();
defer held.release();
var produced: usize = 0;
while (produced < items.len) {
if (self.size < self.capacity) {
if (@sizeOf(T) > 0) {
self.buffer[(self.head +% self.size) % self.capacity] = items[produced];
}
self.readers.signal();
self.size += 1;
produced += 1;
continue;
}
switch (can_block) {
true => self.writers.wait(held),
else => break,
}
}
return produced;
}
fn readItems(self: *Self, items: []T, comptime can_block: bool) usize {
const held = self.mutex.acquire();
defer held.release();
var consumed: usize = 0;
while (consumed < items.len) {
if (self.size > 0) {
if (@sizeOf(T) > 0) {
items.ptr[consumed] = self.buffer[self.head % self.capacity];
}
self.writers.signal();
self.head +%= 1;
self.size -= 1;
consumed += 1;
continue;
}
switch (can_block) {
true => self.readers.wait(held),
else => break,
}
}
return consumed;
}
};
}
pub fn yield() void {
suspend {
var task = Loop.Task{ .data = @frame() };
Futex.getLoop().push(&task);
}
}
const Futex = struct {
fn getLoop() *Loop {
return Loop.instance orelse {
@compileError("Must have an event loop enabled");
};
}
const Queue = std.TailQueue(struct {
address: usize,
task: Loop.Task,
});
const Bucket = struct {
mutex: std.Thread.Mutex = .{},
queue: Queue = .{},
waiters: usize = 0,
var array = [_]Bucket{.{}} ** 256;
fn from(address: usize) *Bucket {
return &array[address % array.len];
}
};
pub const Waiter = struct {
held: @TypeOf(@as(std.Thread.Mutex, undefined).impl).Held,
bucket: *Bucket,
node: Queue.Node,
pub fn cancel(self: *Waiter) void {
_ = @atomicRmw(usize, &self.bucket.waiters, .Sub, 1, .SeqCst);
self.held.release();
}
pub fn wait(self: *Waiter) void {
suspend {
self.node.data.task = Loop.Task{ .data = @frame() };
self.bucket.queue.append(&self.node);
self.held.release();
}
}
};
pub fn wait(address: usize) Waiter {
var waiter: Waiter = undefined;
waiter.node.data.address = address;
waiter.bucket = Bucket.from(address);
waiter.held = waiter.bucket.mutex.acquire();
_ = @atomicRmw(usize, &waiter.bucket.waiters, .Add, 1, .SeqCst);
return waiter;
}
pub fn wake(address: usize, max_waiters: usize) void {
var awoken = Queue{};
defer while (awoken.pop()) |node| {
getLoop().push(&node.data.task);
};
const bucket = Bucket.from(address);
if (@atomicLoad(usize, &bucket.waiters, .SeqCst) == 0) {
return;
}
const held = bucket.mutex.acquire();
defer held.release();
var woke_up: usize = 0;
defer if (woke_up > 0) {
_ = @atomicRmw(usize, &bucket.waiters, .Sub, woke_up, .SeqCst);
};
var current_node = bucket.queue.first;
while (current_node) |node| {
current_node = node.next;
if (node.data.address != address) {
continue;
}
bucket.queue.remove(node);
awoken.append(node);
woke_up += 1;
if (woke_up >= max_waiters) {
break;
}
}
}
};
};
pub fn main() !void {
try testHarness(testSendRecv);
}
fn testHarness(comptime asyncFn: anytype) !@typeInfo(@TypeOf(asyncFn)).Fn.return_type.? {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
var heap = if (std.builtin.os.tag == .windows) std.heap.HeapAllocator.init() else {};
const allocator = if (std.builtin.link_libc)
std.heap.c_allocator
else if (std.builtin.os.tag == .windows)
&heap.allocator
else
&gpa.allocator;
const loop = Loop.instance.?;
try loop.init(allocator);
defer loop.deinit();
const Wrapper = struct {
fn entry(l: *Loop) @typeInfo(@TypeOf(asyncFn)).Fn.return_type.? {
defer l.shutdown();
return asyncFn();
}
};
var frame = async Wrapper.entry(loop);
loop.run();
return nosuspend await frame;
}
fn testSendRecv() !void {
const ITEMS = 1024;
const PRODUCE = 1_000_000;
const CONSUME = PRODUCE;
const Consumer = struct {
fn run(ch: *sync.Channel(u8), wg: *sync.WaitGroup) void {
sync.yield();
defer wg.done();
var items: usize = CONSUME;
while (items > 0) : (items -= 1) {
// std.debug.warn("reading {}\n", .{items});
_ = ch.readItem();
}
}
};
const allocator = std.heap.page_allocator;
const buffer = try allocator.alloc(u8, ITEMS);
defer allocator.free(buffer);
var ch = sync.Channel(u8).init(buffer);
var wg = sync.WaitGroup.init(1);
defer wg.wait();
var consumer = async Consumer.run(&ch, &wg);
defer await consumer;
var items: usize = PRODUCE;
while (items > 0) : (items -= 1) {
// std.debug.warn("writing {}\n", .{items});
ch.writeItem(0);
}
// ch := make(chan byte, 4096)
// wg := sync.WaitGroup{}
// wg.Add(1)
// go func() {
// defer wg.Done()
// for range ch {
// }
// }()
// b.SetBytes(1)
// b.ReportAllocs()
// b.ResetTimer()
// for i := 0; i < b.N; i++ {
// ch <- byte(i)
// }
// close(ch)
// wg.Wait()
}
const std = @import("std");
pub const Scheduler = struct {
counter: usize = 0,
max_workers: usize,
main_worker: ?*Worker = null,
thread_config: System.ThreadConfig,
idle_lock: System.Lock = .{},
idle_workers: ?*Worker = null,
spawned_workers: ?*Worker = null,
run_queues: Task.Priority.Array(Node.Queue) = Task.Priority.arrayInit(Node.Queue),
pub const Config = struct {
max_threads: usize,
stack_size: usize,
};
pub fn init(config: Config) Scheduler {
return Scheduler{
.max_workers = if (std.builtin.single_threaded) 1 else std.math.max(1, config.max_threads),
.thread_config = .{
.stack_size = std.math.max(std.mem.page_size, config.stack_size),
},
};
}
pub fn run(self: *Scheduler) void {
self.notify(true);
}
pub const Task = struct {
node: Node = undefined,
callback: Callback,
pub const Callback = fn(*Worker, *Task) callconv(.C) void;
pub const Affinity = struct {
priority: Priority = .Normal,
worker: ?*Worker = null,
};
pub const Priority = enum(u2) {
Low = 0,
Normal = 1,
High = 2,
Handoff = 3,
fn Array(comptime T: type) type {
return [3]T;
}
fn arrayInit(comptime T: type) Array(T) {
return [_]T{.{}} ** 3;
}
fn toArrayIndex(self: Priority) usize {
return switch (self) {
.Handoff, .High => 2,
.Normal => 1,
.Low => 0,
};
}
};
pub fn init(callback: Callback) Task {
return Task{ .data = callback };
}
pub const Group = struct {
batch: Node.Batch = .{},
pub fn from(task: *Task) Group {
return Group{ .batch = Node.Batch.from(&task.node) };
}
pub fn isEmpty(self: Group) bool {
return self.batch.isEmpty();
}
pub fn append(self: *Group, group: Group) void {
self.batch.push(group.batch, .fifo);
}
pub fn prepend(self: *Group, group: Group) void {
self.batch.push(group.batch, .lifo);
}
pub fn popFirst(self: *Group) ?*Task {
const node = self.batch.pop() orelse return null;
return @fieldParentPtr(Task, "node", node);
}
pub fn iter(self: Group) Iter {
return Iter{ .node = self.batch.head };
}
pub const Iter = struct {
node: ?*Node,
pub fn next(self: *Iter) ?*Task {
const node = self.node orelse return null;
self.node = node.next;
return @fieldParentPtr(Task, "node", node);
}
};
};
};
pub fn schedule(self: *Scheduler, group: Task.Group, affinity: Task.Affinity) void {
if (group.isEmpty()) {
return;
}
if (affinity.worker) |worker| {
worker.run_owned.push(group);
worker.notify();
return;
}
self.run_queues[affinity.priority.toArrayIndex()].push(group);
self.notify(false);
}
const Counter = struct {
state: State = .uninit,
idle: usize = 0,
spawned: usize = 0,
const Count = std.meta.Int(.unsigned, @divFloor(std.meta.bitCount(usize) - 2, 2));
const State = enum(u2) {
uninit = 0,
pending,
notified,
shutdown,
};
fn pack(self: Counter) usize {
var value: usize = 0;
value |= @enumToInt(self.state);
value |= @as(usize, @intCast(Count, self.idle)) << 2;
value |= @as(usize, @intCast(Count, self.spawned)) << (2 + std.meta.bitCount(Count));
return value;
}
fn unpack(value: usize) Counter {
return Counter{
.state = @intToEnum(State, @truncate(std.meta.Tag(State), value)),
.idle = @truncate(Count, value >> 2),
.spawned = @truncate(Count, value >> (2 + std.meta.bitCount(Count))),
};
}
};
pub fn shutdown(self: *Scheduler) void {
var counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic));
while (true) {
if (counter.state == .uninit) {
return;
}
if (counter.state == .shutdown) {
return;
}
var new_counter = counter;
new_counter.state = .shutdown;
new_counter.idle = 0;
if (@cmpxchgWeak(
usize,
&self.counter,
counter.pack(),
new_counter.pack(),
.AcqRel,
.Monotonic,
)) |updated| {
spinLoopHint();
counter = Counter.unpack(updated);
continue;
}
if (counter.idle > 0) {
self.idleNotify(.all);
}
return;
}
}
fn markShutdown(self: *Scheduler) void {
var counter = Counter{ .spawned = 1 };
counter = Counter.unpack(@atomicRmw(usize, &self.counter, .Sub, counter.pack(), .AcqRel));
if (counter.spawned == 1 and counter.state == .shutdown) {
const main_worker = self.main_worker orelse unreachable;
main_worker.stop();
}
}
fn markSpawned(self: *Scheduler) bool {
var counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic));
while (true) {
switch (counter.state) {
.uninit => unreachable,
.pending => return true,
.notified => {},
.shutdown => {
self.markShutdown();
return false;
},
}
var new_counter = counter;
new_counter.state = .pending;
if (@cmpxchgWeak(
usize,
&self.counter,
counter.pack(),
new_counter.unpack(),
.AcqRel,
.Monotonic,
)) |updated| {
spinLoopHint();
counter = Counter.unpack(updated);
continue;
}
return true;
}
}
fn notify(self: *Scheduler, initialize: bool) void {
var counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic));
while (true) {
const can_notify = switch (counter.state) {
.uninit => initialize,
.pending => true,
.notified => false,
.shutdown => false,
};
if (!can_notify) {
return;
}
var new_counter = counter;
new_counter.state = .notified;
if (counter.idle == 0 and (counter.spawned < self.max_workers)) {
new_counter.spawned += 1;
}
if (@cmpxchgWeak(
usize,
&self.counter,
counter.pack(),
new_counter.pack(),
.AcqRel,
.Monotonic,
)) |updated| {
counter = Counter.unpack(updated);
continue;
}
if (counter.idle > 0) {
self.idleNotify(.one);
return;
}
if (counter.spawned < new_counter.spawned) {
if (initialize or std.builtin.single_threaded) {
Worker.run(self);
return;
}
if (!Worker.spawn(self)) {
self.markShutdown();
}
}
return;
}
}
fn wait(self: *Scheduler, worker: *Worker) error{Shutdown}!bool {
var is_idle = false;
var has_local = false;
var counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic));
while (true) {
std.debug.assert(counter.state != .uninit);
if (counter.state == .shutdown) {
self.markShutdown();
return error.Shutdown;
}
if (counter.state == .notified or !is_idle or has_local) {
var new_counter = counter;
counter.state = .pending;
if (!is_idle) {
counter.idle += 1;
} else if (has_local) {
counter.idle -= 1;
}
if (@cmpxchgWeak(
usize,
&self.counter,
counter.pack(),
new_counter.pack(),
.AcqRel,
.Monotonic,
)) |updated| {
spinLoopHint();
counter = Counter.unpack(updated);
continue;
}
if (counter.state == .notified) {
return true;
} else if (has_local) {
return false;
} else {
is_idle = true;
}
}
has_local = self.idleWait(worker);
counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic));
}
}
fn idleWait(self: *Scheduler, worker: *Worker) bool {
{
self.idle_lock.acquire();
defer self.idle_lock.release();
@atomicStore(bool, &worker.idle_pending, true, .Monotonic);
worker.idle_prev = null;
worker.idle_next = self.idle_workers;
self.idle_workers = worker;
}
const has_local = worker.wait();
self.idleCancel(worker);
return has_local;
}
fn idleCancel(self: *Scheduler, worker: *Worker) void {
if (@atomicLoad(bool, &worker.idle_pending, .Monotonic)) {
self.idle_lock.acquire();
defer self.idle_lock.release();
if (@atomicLoad(bool, &worker.idle_pending, .Monotonic)) {
@atomicStore(bool, &worker.idle_pending, false, .Monotonic);
if (worker.idle_next) |next| {
next.idle_prev = worker.idle_prev;
}
if (worker.idle_prev) |prev| {
prev.idle_next = worker.idle_next;
}
if (worker == self.idle_workers) {
self.idle_workers = worker.idle_next;
}
}
}
}
fn idleNotify(self: *Scheduler, scope: enum{ one, all }) void {
var idle_workers: ?*Worker = null;
defer while (idle_workers) |worker| {
idle_workers = worker.idle_next;
worker.notify();
};
self.idle_lock.acquire();
defer self.idle_lock.release();
var max_wake: usize = switch (scope) {
.one => 1,
.all => std.math.maxInt(usize),
};
while (max_wake > 0) : (max_wake -= 1) {
const worker = self.idle_workers orelse break;
self.idle_workers = worker.idle_next;
if (self.idle_workers) |next_worker| {
next_worker.idle_prev = null;
}
@atomicStore(bool, &worker.idle_pending, false, .Monotonic);
worker.idle_next = idle_workers;
idle_workers = worker;
}
}
pub const Worker = struct {
scheduler: *Scheduler,
thread: ?System.Thread,
timer: Timer = .{},
join_ptr: usize = 0,
idle_ptr: usize = 0,
idle_pending: bool = false,
idle_prev: ?*Worker = undefined,
idle_next: ?*Worker = undefined,
spawned_prev: ?*Worker = undefined,
spawned_next: ?*Worker = undefined,
steal_target: ?*Worker = null,
run_next: Node.Batch = .{},
run_owned: Node.Queue = .{},
run_queues: Task.Priority.Array(Node.Queue) = Task.Priority.arrayInit(Node.Queue),
fn spawn(scheduler: *Scheduler) bool {
const Spawner = struct {
put_event: System.Event,
got_event: System.EVent,
sched: *Scheduler,
thread: System.Thread,
fn runWorker(self_ptr: usize) void {
const self = @intToPtr(*Spawner, self_ptr);
self.put_event.wait(null) catch unreachable;
const sched = self.sched;
const thread = self.thread;
self.got_event.set();
Worker.run(sched, thread);
}
};
var spawner: Spawner = .{};
spawner.put_event.init();
spawner.got_event.init();
defer {
spawner.put_event.deinit();
spawner.got_event.deinit();
}
spawner.sched = scheduler;
spawner.thread = System.Thread.spawn(
scheduler.thread_config,
Spawner.runWorker,
@ptrToInt(&spawner),
) catch return false;
spawner.put_event.set();
spawner.got_event.wait(null) catch unreachable;
return true;
}
fn run(scheduler: *Scheduler, thread: ?System.Thread) Worker {
var self = Worker{
.scheduler = scheduler,
.thread = thread,
};
var spawned_workers = @atomicLoad(?*Worker, &scheduler.spawned_workers, .Monotonic);
while (true) {
self.spawned_prev = null;
self.spawned_next = spawned_workers;
spawned_workers = @cmpxchgWeak(
?*Worker,
&scheduler.spawned_workers,
spawned_workers,
&self,
.Release,
.Monotonic,
) orelse break;
}
if (spawned_workers) |next_worker| {
next_worker.spawned_prev = &self;
} else {
scheduler.main_worker = &self;
}
defer {
self.join();
if (@atomicLoad(?*Worker, &self.spawned_prev, .Monotonic)) |prev_worker| {
prev_worker.stop();
}
}
var was_idle = true;
if (!scheduler.markSpawned()) {
return;
}
var run_tick: u8 = 0;
while (true) {
@atomicStore(usize, &self.idle_ptr, 0, .Monotonic);
if (self.poll(run_tick == 127)) |node| {
if (was_idle) scheduler.notify(false);
was_idle = true;
run_tick +%= 1;
const task = @fieldParentPtr(Task, "node", node);
(task.callback)(task, &self);
continue;
}
was_idle = true;
self.idleWait(&self) catch break;
}
}
pub fn getScheduler(self: Worker) *Scheduler {
return self.scheduler;
}
pub fn schedule(self: *Worker, group: Task.Group, affinity: Task.Affinity) void {
if (group.isEmpty()) {
return;
}
if (affinity.worker) |worker| {
if (worker == self) {
self.run_next.push(group.batch, .fifo);
return;
}
worker.run_owned.push(group.batch);
worker.notify();
return;
}
if (affinity.priority == .Handoff) {
self.run_next.push(group.batch, .lifo);
return;
}
self.run_queues[affinity.priority.toArrayIndex()].push(group.batch);
self.getScheduler().notify(false);
}
fn poll(self: *Worker, be_fair: bool) ?*Node {
const scheduler = self.getScheduler();
if (be_fair) {
if (self.pollEvents()) |node| return node;
if (self.pollQueues(&scheduler.run_queues, true)) |node| return node;
if (self.pollQueues(&self.run_queues, true)) |node| return node;
}
if (self.pollLocal(be_fair)) |node| {
return node;
}
var attempts: u8 = 4;
while (attempts != 0) : (attempts -= 1) {
if (self.pollQueues(&self.run_queues, false)) |node| {
return node;
}
if (self.pollQueues(&scheduler.run_queues, false)) |node| {
return node;
}
var iter = Counter.unpack(@atomicLoad(usize, &scheduler.counter, .Monotonic)).spawned;
while (iter != 0) : (iter -= 1) {
const target = self.steal_target
orelse @atomicLoad(?*Worker, &scheduler.spawned_workers, .Acquire)
orelse unreachable;
if (target != self) {
const be_aggresive = attempts <= (steal_attempts / 2);
if (self.pollSteal(target, be_aggresive)) |node| {
self.steal_target = target;
return node;
}
}
self.steal_target = target.spawned_next;
}
}
return self.pollEvents();
}
fn pollNext(self: *Worker) ?*Node {
return self.run_next.pop();
}
fn pollOwned(self: *Worker) ?*Node {
return self.run_owned.popAssumeOwned();
}
fn pollLocal(self: *Worker, be_fair: bool) ?*Node {
const PollType = enum{ next, owned, timers };
var poll_order = [_]PollType{ .next, .owned, .timers };
if (be_fair) {
poll_order = [_]PollType{ .timers, .owned, .next };
}
for (poll_types) |poll_type| {
if (switch (poll_type) {
.next => self.pollNext(),
.owned => self.pollOwned(),
.timers => self.pollTimers(self),
}) |node| {
return node;
}
}
return null;
}
fn pollSteal(self: *Worker, target: *Worker, be_aggresive: bool) ?*Node {
if (self.pollQueues(&target.run_queues, false)) |node| {
return node;
}
if (be_aggresive) {
if (self.pollTimers(target)) |node| {
return node;
}
}
return null;
}
fn pollQueues(self: *Worker, queues: *Task.Priority.Array(Node.Queue), be_fair: bool) ?*Node {
var poll_order = [_]Task.Priority{ .High, .Normal, .Low };
if (be_fair) {
poll_order = [_]Task.Priority{ .Low, .Normal, .High };
}
for (poll_order) |priority| {
if (queues[priority.toArrayIndex()].pop()) |node| {
return node;
}
}
return null;
}
fn pollTimers(self: *Worker, target: *Worker) ?*Node {
var group = switch (target.timer.poll()) {
.expired => |group| group,
else => return null,
};
const first_node = group.pop();
self.run_queues[Task.Priority.High.toArrayIndex()].push(group);
return first_node;
}
fn pollEvents(self: *Worker) ?*Node {
// TODO:
}
fn wait(self: *Worker) bool {
@compileError("TODO");
}
fn notify(self: *Worker) void {
@compileError("TODO");
}
fn join(self: *Worker) void {
var event: System.Event = undefined;
var has_event = false;
defer if (has_event) {
event.deinit();
}
var join_ptr = @atomicLoad(usize, &self.join_ptr, .Acquire);
while (true) {
if (join_ptr == 1) {
return;
}
if (!has_event) {
has_event = true;
event.init();
}
join_ptr = @cmpxchgWeak(
usize,
&self.join_ptr,
join_ptr,
@ptrToInt(&event),
.AcqRel,
.Acquire,
) orelse {
event.wait(null) catch unreachable;
return;
};
}
}
fn stop(self: *Worker) void {
const join_ptr = @atomicRmw(usize, &self.join_ptr, .Xchg, 1, .AcqRel);
if (@intToPtr(?*System.Event, join_ptr)) |event| {
event.set();
}
}
};
const Timer = struct {
tree_lock: System.Lock = System.Lock{},
has_pending: bool = false,
tree: Tree = .{},
const Tree = Treap(struct {
deadline: System.Instant,
pub const use_min = true;
const Key = @This();
pub fn isLessThan(left: Key, right: Key) bool {
if (isEqual(left, right)) return false;
return right.deadline.elapsed(left.deadline) != null;
}
pub fn isEqual(left: Key, right: Key) bool {
return right.deadline.isEqual(left.deadline);
}
pub fn isMoreImportant(left: Key, right: Key) bool {
return !isLessThan(left, right);
}
});
pub const Node = struct {
tree_node: Tree.Node,
is_pending: bool,
group: Task.Group,
timer: *Timer,
prev: ?*Node,
next: ?*Node,
tail: *Node,
};
pub fn schedule(self: *Timer, node: *Node, deadline: System.Instant, group: Task.Group) void {
self.tree_lock.acquire();
defer self.tree_lock.release();
node.prev = null;
node.next = null;
node.tail = node;
node.timer = self;
node.group = group;
@atomicStore(bool, &node.is_pending, true, .Monotonic);
@atomicStore(bool, &self.has_pending, true, .Monotonic);
const deadline_ms = deadline.round(std.time.ns_per_ms);
const key = Tree.Key{ .deadline = deadline_ms };
const lookup = self.tree.find(key);
const head_node = lookup.node orelse {
self.tree.insert(lookup, &node.tree_node, key);
return;
};
const head = @fieldParentPtr(Node, "tree_node", head_node);
node.tree_node = head.tree_node;
node.prev = head.tail;
head.tail.next = node;
head.tail = node;
}
pub fn tryCancel(self: *Timer, node: *Node) bool {
if (!@atomicLoad(bool, &node.is_pending, .Monotonic)) {
return false;
}
self.tree_lock.acquire();
defer self.tree_lock.release();
if (!@atomicLoad(bool, &node.is_pending, .Monotonic)) {
return false;
}
if (node.prev) |prev| {
prev.next = node.next;
if (node.next) |next| {
next.prev = prev;
} else {
const lookup = self.tree.find(node.tree_node.key);
const tree_node = lookup.node orelse unreachable;
const head = @fieldParentPtr(Node, "tree_node", tree_node);
std.debug.assert(head.tail == node);
head.tail = prev;
}
} else if (node.next) |next| {
self.tree.replace(&node.tree_node, &next.tree_node);
next.tail = node.tail;
next.prev = null;
} else {
self.tree.remove(&node.tree_node);
}
@atomicStore(bool, &node.is_pending, false, .Monotonic);
if (self.tree.peekMin() == null) {
@atomicStore(bool, &self.has_pending, false, .Monotonic);
}
return true;
}
pub const Poll = union(enum){
empty,
wait: u64,
expired: Task.Group,
};
pub fn poll(self: *Timer) Poll {
if (!@atomicLoad(bool, &self.has_pending, .Monotonic)) {
return .empty;
}
if (!self.tree_lock.tryAcquire()) return .empty;
defer self.tree_lock.release();
if (!@atomicLoad(bool, &self.has_pending, .Monotonic)) {
return .empty;
}
var expired = Task.Group{};
var next_delay: ?u64 = null;
var timestamp: ?System.Instant = null;
while (true) {
const tree_node = self.tree.peekMin() orelse {
@atomicStore(bool, &self.has_pending, false, .Monotonic);
break;
};
const now = timestamp orelse blk: {
timestamp = System.Instant.now();
break :blk timestamp orelse unreachable;
};
if (now.elapsed(tree_node.key.deadline)) |expired_since| {
self.tree.remove(tree_node);
} else {
next_delay = tree_node.key.deadline.elapsed(now);
break;
}
var nodes: ?*Node = @fieldParentPtr(Node, "tree_node", tree_node);
while (nodes) |node| {
nodes = node.next;
@atomicStore(bool, &node.is_pending, false, .Monotonic);
expired.append(node.group);
}
}
if (!expired.isEmpty()) return Poll{ .expired = expired };
if (next_delay) |delay| return Poll{ .wait = delay };
return .empty;
}
};
const Node = struct {
next: usize,
pub const Batch = struct {
head: ?*Node = null,
tail: *Node = undefined,
pub fn from(node: *Node) Batch {
node.next = 0;
return Batch{
.head = node,
.tail = node,
};
}
pub fn isEmpty(self: Batch) bool {
return self.head == null;
}
pub fn push(self: *Batch, batch: Batch, order: enum{ lifo, fifo }) void {
if (batch.isEmpty()) {
return;
}
if (self.isEmpty()) {
self.* = batch;
return;
}
switch (order) {
.lifo => {
batch.tail.next = self.head;
self.head = batch.head;
},
.fifo => {
self.tail.next = batch.head;
self.tail = batch.tail;
},
}
}
pub fn pop(self: *Batch) ?*Node {
const node = self.head orelse return null;
self.head = node.next;
return node;
}
};
pub const Queue = struct {
head: Node = .{ .next = 0 },
tail: ?*Node = null,
pub fn isEmpty(self: *const Queue) bool {
return @atomicLoad(usize, &self.head.next, .Monotonic) == 0;
}
pub fn push(self: *Queue, batch: Batch) void {
if (batch.isEmpty()) {
return;
}
const old_tail = @atomicRmw(?*Node, &self.tail, .Xchg, batch.tail, .AcqRel);
const tail = old_tail orelse &self.head;
@atomicStore(usize, &old_tail.next, @ptrToInt(batch.head), .Release);
}
pub fn popAssumeOwned(self: *Queue) ?*Node {
const next = @atomicLoad(usize, &self.head.next, .Monotonic);
if (next == 0) {
return null;
}
std.debug.assert(next & 1 == 0);
return self.popWithAcquired(@intToPtr(*Node, next));
}
pub fn pop(self: *Queue) ?*Node {
var next = @atomicLoad(usize, &self.head.next, .Monotonic);
while (true) : (spinLoopHint()) {
if ((next == 0) or (next & 1 != 0)) {
return null;
}
next = @cmpxchgWeak(
usize,
&self.head.next,
next,
next | 1,
.Acquire,
.Monotonic,
) orelse return self.popWithAcquired(@intToPtr(*Node, next));
}
}
fn popWithAcquired(self: *Queue, head: *Node) ?*Node {
var next = @atomicLoad(usize, &head.next, .Acquire);
if (next != 0) {
@atomicStore(usize, &self.head.next, next, .Monotonic);
return head;
}
@atomicStore(usize, &self.head.next, 0, .Monotonic);
if (@cmpxchgStrong(?*Node, &self.tail, head, &self.head, .AcqRel, .Acquire) == null) {
return null;
}
var spin: u8 = 10;
while (spin > 0) : (spin -= 1) {
next = @atomicLoad(usize, &head.next, .Acquire);
if (next != 0) {
@atomicStore(usize, &self.head.next, next, .Monotonic);
return head;
}
}
@atomicStore(usize, &self.head.next, @ptrToInt(head), .Monotonic);
return null;
}
};
};
};
fn Treap(comptime KeyType: type) type {
return struct {
root: ?*Node = null,
min: @TypeOf(min_init) = min_init,
const Self = @This();
pub const Key = KeyType;
const min_init = if (Key.use_min) @as(?*Node, null) else {};
const Parent = struct {
node: *Node,
is_left: bool,
fn pack(parent: ?Parent) usize {
const self = parent orelse return 0;
return @ptrToInt(self.node) | @as(usize, @boolToInt(self.is_left));
}
fn unpack(value: usize) ?Parent {
return Parent{
.node = @intToPtr(?*Node, value & ~@as(usize, 0b1)) orelse return null,
.is_left = value & 1 != 0,
};
}
};
pub const Node = struct {
key: Key,
parent: usize,
children: [2]?*Node,
};
pub const Lookup = struct {
node: ?*Node,
parent: ?Parent,
};
pub fn find(self: *Self, key: Key) Lookup {
var lookup = Lookup{
.node = self.root,
.parent = null,
};
while (true) {
const node = lookup.node orelse break;
if (Key.isEqual(key, node.key))
break;
const is_left = Key.isLessThan(key, node.key);
lookup = Lookup{
.node = node.children[@boolToInt(!is_left)],
.parent = Parent{
.node = node,
.is_left = is_left,
},
};
}
return lookup;
}
pub fn insert(self: *Self, lookup: Lookup, node: *Node, key: Key) void {
node.* = Node{
.key = key,
.parent = Parent.pack(lookup.parent),
.children = [2]?*Node{ null, null },
};
if (Key.use_min) {
if (self.min) |min_node| {
if (Key.isLessThan(key, min_node.key)) {
self.min = node;
}
} else {
self.min = node;
}
}
if (lookup.parent) |parent| {
parent.node.children[@boolToInt(!parent.is_left)] = node;
} else {
self.root = node;
}
self.fixupInsert(node);
}
pub fn remove(self: *Self, node: *Node) void {
if (Key.use_min and self.min == node) {
if (new_node) |new| {
self.min = new;
} else {
self.min = self.findNextMin(node);
}
}
self.fixupRemove(node);
if (Parent.unpack(node.parent)) |parent| {
parent.node.children[@boolToInt(!parent.is_left)] = new_node;
} else {
self.root = new_node;
}
}
fn fixupInsert(self: *Self, node: *Node) void {
while (true) {
const parent = Parent.unpack(node.parent) orelse break;
if (!Key.isMoreImportant(parent.node.key, node.key))
break;
const children = &parent.node.children;
std.debug.assert(node == children[0] or node == children[1]);
self.rotate(parent.node, node == children[1]);
}
}
fn fixupRemove(self: *Self, node: *Node) void {
while (true) {
if (node.children[0] == null and node.children[1] == null)
break;
self.rotate(node, blk: {
const right = node.children[1] orelse break :blk false;
const left = node.children[0] orelse break :blk true;
break :blk !Key.isMoreImportant(left.key, right.key);
});
}
}
fn rotate(self: *Self, node: *Node, is_left: bool) void {
std.debug.assert(self.root != null);
std.debug.assert(node.children[@boolToInt(is_left)] != null);
const parent = Parent.unpack(node.parent);
const swap_node = node.children[@boolToInt(is_left)] orelse unreachable;
const child_node = swap_node.children[@boolToInt(!is_left)];
swap_node.children[@boolToInt(!is_left)] = node;
node.parent = Parent.pack(Parent{
.node = swap_node,
.is_left = is_left,
});
node.children[@boolToInt(is_left)] = child_node;
if (child_node) |child| {
child.parent = Parent.pack(Parent{
.node = node,
.is_left = !is_left,
});
}
swap_node.parent = parent;
if (parent) |p| {
const children = &p.node.children;
std.debug.assert(node == children[0] or node == children[1]);
children[@boolToInt(node == children[1])] = swap_node;
} else {
self.root = swap_node;
}
}
pub fn replace(self: *Self, node: *Node, new_node: ?*Node) void {
if (new_node) |new|
new.* = node.*;
if (Key.use_min and self.min == node) {
if (new_node) |new| {
self.min = new;
} else {
self.min = self.findNextMin(node);
}
}
if (Parent.unpack(node.parent)) |parent| {
parent.node.children[@boolToInt(!parent.is_left)] = new_node;
} else {
self.root = new_node;
}
for (node.children) |*child_node, index| {
const child = child_node orelse continue;
child.parent = Parent.pack(blk: {
break :blk Parent{
.node = new_node orelse break :blk null,
.is_left = index == 0,
};
});
}
}
fn findNextMin(self: *Self, node: *Node) ?*Node {
std.debug.assert(self.min == node);
std.debug.assert(node.children[0] == null);
var next_min = blk: {
if (node.children[1]) |right|
break :blk right;
const parent = Parent.unpack(node.parent) orelse return null;
std.debug.assert(node == parent.node.children[0]);
break :blk parent.node.children[1] orelse parent.node;
};
while (next_min.children[0]) |left_node| {
if (left_node == node)
break;
next_min = left_node;
}
return next_min;
}
pub fn peekMin(self: *Self) ?*Node {
if (!Key.use_min)
@compileError("Treap not configured for priority operations");
return self.min;
}
};
}
const System = struct {
const is_linux = std.builtin.os.tag == .linux;
const is_windows = std.builtin.os.tag == .windows;
const is_darwin = switch (std.builtin.os.tag) {
.macos, .ios, .tvos, .watchos => true,
else => false,
};
const is_posix = std.builtin.link_libc and (is_linux or is_bsd);
const is_bsd = is_linux or is_darwin or switch (std.builtin.os.tag) {
.freebsd, .kfreebsd, .openbsd, .netbsd, .dragonfly => true,
else => false,
};
pub fn spinLoopHint() void {
const hint_asm: []const u8 = switch (std.builtin.arch) {
.i386, .x86_64 => "pause",
.aarch64 => "yield",
else => return,
};
asm volatile(hint_asm ::: "memory");
}
// Lock
pub usingnamespace struct {
pub const Lock = if (is_windows)
WindowsLock
else if (is_darwin)
DarwinLock
else if (is_linux)
LinuxLock
else if (is_posix)
PosixLock
else
@compileError("System unsupported");
const WindowsLock = struct {
srwlock: std.os.windows.SRWLOCK = std.os.windows.SRWLOCK_INIT,
pub fn tryAcquire(self: *WindowsLock) bool {
const status = std.os.windows.kernel32.TryAcquireSRWLockExclusive(&self.srwlock);
return status != std.os.windows.FALSE;
}
pub fn acquire(self: *WindowsLock) void {
std.os.windows.kernel32.AcquireSRWLockExclusive(&self.srwlock);
}
pub fn release(self: *WindowsLock) void {
std.os.windows.kernel32.ReleaseSRWLockExclusive(&self.srwlock);
}
};
const DarwinLock = extern struct {
os_unfair_lock: u32 = 0,
extern fn os_unfair_lock_trylock(lock: *DarwinLock) bool;
extern fn os_unfair_lock_lock(lock: *DarwinLock) void;
extern fn os_unfair_lock_unlock(lock: *DarwinLock) void;
pub fn tryAcquire(self: *DarwinLock) bool {
return os_unfair_lock_trylock(&self.os_unfair_lock);
}
pub fn acquire(self: *DarwinLock) void {
os_unfair_lock_lock(&self.os_unfair_lock);
}
pub fn release(self: *DarwinLock) void {
os_unfair_lock_unlock(&self.os_unfair_lock);
}
};
const LinuxLock = struct {
state: State = .unlocked,
const State = enum(i32) {
unlocked = 0,
locked,
contended,
};
pub fn tryAcquire(self: *LinuxLock) bool {
return @cmpxchgStrong(
State,
&self.state,
.unlocked,
.locked,
.Acquire,
.Monotonic,
) == null;
}
pub fn acquire(self: *LinuxLock) void {
if (@cmpxchgWeak(
State,
&self.state,
.unlocked,
.locked,
.Acquire,
.Monotonic,
)) |_| {
self.acquireSlow();
}
}
fn acquireSlow(self: *LinuxLock) void {
@setCold(true);
var spin: u8 = 10;
var new_state = State.locked;
var state = @atomicLoad(State, &self.state, .Monotonic);
while (true) {
if (state == .unlocked) {
state = @cmpxchgWeak(State, &self.state, state, new_state, .Acquire, .Monotonic) orelse return;
spinLoopHint();
continue;
}
if (state == .locked and spin > 0) {
spin -= 1;
spinLoopHint();
state = @atomicLoad(State, &self.state, .Monotonic);
continue;
}
if (state != .contended) {
if (@cmpxchgWeak(State, &self.state, state, .contended, .Monotonic, .Monotonic)) |updated| {
spinLoopHint();
state = updated;
continue;
}
}
switch (std.os.linux.getErrno(std.os.linux.futex_wait(
@ptrCast(*const i32, &self.state),
std.os.linux.FUTEX_PRIVATE_FLAG | std.os.linux.FUTEX_WAIT,
@enumToInt(State.contended),
null,
))) {
0 => {},
std.os.EINTR => {},
std.os.EAGAIN => {},
std.os.ETIMEDOUT => unreachable,
else => unreachable,
}
spin = 5;
new_state = .contended;
state = @atomicLoad(State, &self.state, .Monotonic);
}
}
pub fn release(self: *LinuxLock) void {
switch (@atomicRmw(State, &self.state, .Xchg, .unlocked, .Release)) {
.unlocked => unreachable,
.locked => {},
.contended => self.releaseSlow(),
}
}
fn releaseSlow(self: *LinuxLock) void {
@setCold(true);
switch (std.os.linux.getErrno(std.os.linux.futex_wake(
@ptrCast(*const i32, &self.state),
std.os.linux.FUTEX_PRIVATE_FLAG | std.os.linux.FUTEX_WAKE,
1,
))) {
0 => {},
std.os.EINVAL => {},
std.os.EACCES => {},
std.os.EFAULT => {},
else => unreachable,
}
}
};
const PosixLock = struct {
state: usize = UNLOCKED,
const UNLOCKED = 0;
const LOCKED: usize = 1 << 0;
const WAKING: usize = 1 << 1;
const WAITING = ~(LOCKED | WAKING);
const Waiter = struct {
prev: ?*Waiter,
next: ?*Waiter,
tail: ?*Waiter,
event: Event align(std.math.max(~WAITING + 1, @alignOf(Event))),
};
pub fn tryAcquire(self: *PosixLock) bool {
var state = @atomicLoad(usize, &self.state, .Monotonic);
while (true) {
if (state & LOCKED != 0) {
return false;
}
state = @cmpxchgWeak(
usize,
&self.state,
state,
state | LOCKED,
.Acquire,
.Monotonic,
) orelse return true;
}
}
pub fn acquire(self: *PosixLock) void {
if (@cmpxchgWeak(
usize,
&self.state,
UNLOCKED,
LOCKED,
.Acquire,
.Monotonic,
)) |_| {
self.acquireSlow();
}
}
fn acquireSlow(self: *PosixLock) void {
@setCold(true);
var spin: u8 = 10;
var has_event = false;
var waiter: Waiter = undefined;
var state = @atomicLoad(usize, &self.state, .Monotonic);
while (true) {
if (state & LOCKED == 0) {
state = @cmpxchgWeak(usize, &self.state, state, state | LOCKED, .Acquire, .Monotonic) orelse break;
spinLoopHint();
continue;
}
const head = @intToPtr(?*Waiter, state & WAITING);
if (head == null and spin > 0) {
spin -= 1;
spinLoopHint();
state = @atomicLoad(usize, &self.state, .Monotonic);
continue;
}
if (!has_event) {
has_event = true;
waiter.event.init();
}
waiter.next = head;
waiter.prev = null;
waiter.tail = if (head == null) &waiter else null;
if (@cmpxchgWeak(
usize,
&self.state,
state,
(state & ~WAITING) | @ptrToInt(&waiter),
.Release,
.Monotonic,
)) |updated| {
spinLoopHint();
state = updated;
continue;
}
spin = 5;
waiter.event.wait(null) catch unreachable;
waiter.event.reset();
state = @atomicLoad(usize, &self.state, .Monotonic);
}
if (has_event) {
waiter.event.deinit();
}
}
pub fn release(self: *PosixLock) void {
const state = @atomicRmw(usize, &self.state, .Sub, LOCKED, .Release);
if (state >= WAITING) {
self.releaseSlow();
}
}
fn releaseSlow(self: *PosixLock) void {
var state = @atomicLoad(usize, &self.state, .Monotonic);
while (true) {
if ((state < WAITING) or (state & (LOCKED | WAKING) != 0)) {
return;
}
state = @cmpxchgWeak(
usize,
&self.state,
state,
state | WAKING,
.Acquire,
.Monotonic,
) orelse break;
}
state |= WAKING;
dequeue: while (true) {
const head = @intToPtr(*Waiter, state & WAITING);
const tail = head.tail orelse blk: {
var current = head;
while (true) {
const next = current.next orelse unreachable;
next.prev = current;
current = next;
if (current.tail) |tail| {
head.tail = tail;
break :blk tail;
}
}
};
while (state & LOCKED != 0) : (spinLoopHint()) {
state = @cmpxchgWeak(usize, &self.state, state, state & ~WAKING, .Release, .Monotonic) orelse return;
if (state & LOCKED == 0) {
@fence(.Acquire);
continue :dequeue;
}
}
if (tail.prev) |new_tail| {
head.tail = new_tail;
_ = @atomicRmw(usize, &self.state, .And, ~WAKING, .Release);
} else {
while (true) : (spinLoopHint()) {
state = @cmpxchgWeak(usize, &self.state, state, state & LOCKED, .Release, .Monotonic) orelse break;
if (state & WAITING != 0) {
@fence(.Acquire);
continue :dequeue;
}
}
}
tail.event.set();
return;
}
}
};
};
// Once
pub usingnamespace struct {
pub const Once = if (is_windows)
WindowsOnce
else if (is_darwin)
DarwinOnce
else if (is_linux)
LinuxOnce
else if (is_posix)
PosixOnce
else
@compileError("System unsupported");
const WindowsOnce = struct {
once: std.os.windows.INIT_ONCE = std.os.windows.INIT_ONCE_STATIC_INIT,
pub fn call(self: *WindowsOnce, comptime onceFn: fn () void) void {
const Wrapper = struct {
fn function(
once: *std.os.windows.INIT_ONCE,
parameter: ?std.os.windows.PVOID,
context: ?std.os.windows.PVOID,
) std.os.windows.BOOL {
onceFn();
return std.os.windows.TRUE;
}
};
const status = std.os.windows.InitOnceExecuteOnce(&self.once, Wrapper.function, null, null);
if (status != std.os.windows.FALSE) {
const err = std.os.windows.unexpectedError(std.os.windows.kernel32.GetLastError());
unreachable;
}
}
};
const DarwinOnce = struct {
once: dispatch_once_t = 0,
const dispatch_once_t = usize;
const dispatch_function_t = fn (?*c_void) callconv(.C) void;
extern fn dispatch_once_f(
predicate: *dispatch_once_t,
context: ?*c_void,
function: dispatch_function_t,
) void;
pub fn call(self: *DarwinOnce, comptime onceFn: fn () void) void {
const Wrapper = struct {
fn function(_: ?*c_void) callconv(.C) void {
return onceFn();
}
};
dispatch_once_f(&self.once, null, Wrapper.function);
}
};
const LinuxOnce = struct {
state: State = .uninit,
const State = enum(i32) {
uninit,
calling,
waiting,
init,
};
pub fn call(self: *DarwinOnce, comptime onceFn: fn () void) void {
const state = @atomicLoad(State, &self.state, .Acquire);
if (state != .init) {
self.callSlow(state, onceFn);
}
}
fn callSlow(self: *PosixOnce, current_state: State, comptime onceFn: fn() void) void {
@setCold(true);
var spin: u8 = 10;
var state = current_state;
while (true) {
if (state == .init) {
return;
}
if (state == .uninit) {
state = @cmpxchgWeak(State, &self.state, .uninit, .calling, .Acquire, .Acquire) orelse break;
spinLoopHint();
continue;
}
if (state == .calling) {
if (spin > 0) {
spin -= 1;
spinLoopHint();
state = @atomicLoad(State, &self.state, .Acquire);
continue;
}
if (@cmpxchgWeak(State, &self.state, .calling, .waiting, .Acquire, .Acquire)) |updated| {
spinLoopHint();
state = updated;
continue;
}
}
switch (std.os.linux.getErrno(std.os.linux.futex_wait(
@ptrCast(*const i32, &self.state),
std.os.linux.FUTEX_PRIVATE_FLAG | std.os.linux.FUTEX_WAIT,
@enumToInt(State.waiting),
null,
))) {
0 => {},
std.os.EINTR => {},
std.os.EAGAIN => {},
std.os.ETIMEDOUT => unreachable,
else => unreachable,
}
state = @atomicLoad(State, &self.state, .Acquire);
}
onceFn();
state = @atomicRmw(State, &self.state, .Xchg, .init, .Release);
std.debug.assert(state == .calling or state == .waiting);
if (state == .waiting) {
switch (std.os.linux.getErrno(std.os.linux.futex_wake(
@ptrCast(*const i32, &self.state),
std.os.linux.FUTEX_PRIVATE_FLAG | std.os.linux.FUTEX_WAKE,
std.math.maxInt(i32),
))) {
0 => {},
std.os.EINVAL => {},
std.os.EACCES => unreachable,
std.os.EFAULT => unreachable,
else => unreachable,
}
}
}
};
const PosixOnce = struct {
state: usize = UNINIT,
const UNINIT = 0;
const INIT = 1;
const Waiter = struct {
state: usize align(std.math.max(2, @alignOf(usize))),
event: Event,
};
pub fn call(self: *PosixOnce, comptime onceFn: fn () void) void {
const state = @atomicLoad(usize, &self.state, .Acquire);
if (state != INIT) {
self.callSlow(state, onceFn);
}
}
fn callSlow(self: *PosixOnce, current_state: usize, comptime onceFn: fn() void) void {
@setCold(true);
var spin: u8 = 10;
var state = current_state;
var waiter: Waiter = undefined;
var has_event = false;
defer if (has_event) {
waiter.event.deinit();
};
while (true) {
if (state == INIT) {
return;
}
if (state != UNINIT) {
if (spin > 0) {
spin -= 1;
spinLoopHint();
state = @atomicLoad(usize, &self.state, .Acquire);
continue;
}
if (!has_event) {
has_event = true;
waiter.event.init();
}
}
waiter.state = state;
state = @cmpxchgWeak(
usize,
&self.state,
state,
@ptrToInt(&waiter),
.AcqRel,
.Acquire,
) orelse break;
}
if (state != UNINIT) {
waiter.event.wait(null) catch unreachable;
return;
}
onceFn();
state = @atomicRmw(usize, &self.state, .Xchg, INIT, .AcqRel);
std.debug.assert(state != UNINIT and state != INIT);
while (@intToPtr(?*Waiter, state & ~@as(usize, 1))) |pending_waiter| {
if (pending_waiter == &waiter) break;
state = pending_waiter.state;
pending_waiter.event.set();
}
}
};
};
// Thread
pub usingnamespace struct {
pub const Thread = if (is_windows)
WindowsThread
else if (is_posix)
PosixThread
else if (is_linux)
LinuxThread
else
@compileError("System unsupported");
pub const ThreadConfig = struct {
stack_size: usize,
};
const WindowsThread = struct {
handle: std.os.windows.HANDLE,
pub fn spawn(config: ThreadConfig, comptime entryFn: fn(usize) void, parameter: usize) !WindowsThread {
const Wrapper = struct {
fn entry(raw_arg: std.os.windows.LPVOID) callconv(.C) std.os.windows.DWORD {
entryFn(@ptrToInt(raw_arg));
return 0;
}
};
const handle = std.os.windows.kernel32.CreateThread(
null,
std.math.max(64 * 1024, config.stack_size),
Wrapper.entry,
@intToPtr(?std.os.windows.LPVOID, parameter),
0,
null,
) orelse return error.SpawnError;
return WindowsThread{ .handle = handle };
}
pub fn join(self: WindowsThread) void {
std.os.windows.WaitForSingleObjectEx(self.handle, std.os.windows.INFINITE, false) catch unreachable;
std.os.windows.CloseHandle(self.handle);
}
};
const PosixThread = struct {
handle: std.c.pthread_t,
pub fn spawn(config: ThreadConfig, comptime entryFn: fn(usize) void, parameter: usize) !PosixThread {
const Wrapper = struct {
fn entry(raw_arg: ?*c_void) callconv(.C) ?*c_void {
entryFn(@ptrToInt(raw_arg));
return null;
}
};
var attr: std.c.pthread_attr_t = undefined;
if (std.c.pthread_attr_init(&attr) != 0) return error.SystemResources;
defer std.debug.assert(std.c.pthread_attr_destroy(&attr) == 0);
const stack_size = std.math.max(16 * 1024, config.stack_size);
if (std.c.pthread_attr_setstacksize(&attr, stack_size) != 0) {
return error.SystemResources;
}
var handle: std.c.pthread_t = undefined;
const rc = std.c.pthread_create(
&handle,
&attr,
Wrapper.entry,
@intToPtr(?*c_void, parameter),
);
return switch (rc) {
0 => PosixThread{ .handle = handle },
else => error.SpawnError,
};
}
pub fn join(self: PosixThread) void {
const rc = std.c.pthread_join(self.handle, null);
std.debug.assert(rc == 0);
}
};
const LinuxThread = struct {
info: *Info,
const Info = struct {
mmap_ptr: usize,
mmap_len: usize,
parameter: usize,
handle: i32,
};
pub fn spawn(config: ThreadConfig, comptime entryFn: fn(usize) void, parameter: usize) !LinuxThread {
var mmap_size: usize = std.mem.page_size;
const guard_end = mmap_size;
mmap_size = std.mem.alignForward(mmap_size + config.stack_size, std.mem.page_size);
const stack_end = mmap_size;
mmap_size = std.mem.alignForward(mmap_size, @alignOf(Info));
const info_begin = mmap_size;
mmap_size = std.mem.alignForward(mmap_size + @sizeOf(Info), std.os.linux.tls.tls_image.alloc_align);
const tls_begin = mmap_size;
mmap_size = std.mem.alignForward(mmap_size + std.os.linux.tls.tls_image.alloc_size, std.mem.page_size);
const mmap_bytes = try std.os.mmap(
null,
mmap_size,
std.os.PROT_NONE,
std.os.MAP_PRIVATE | std.os.MAP_ANONYMOUS,
-1,
0,
);
errdefer std.os.munmap(mmap_bytes);
try std.os.mprotect(
mmap_bytes[guard_end..],
std.os.PROT_READ | std.os.PROT_WRITE,
);
const info = @ptrCast(*Info, @alignCast(@alignOf(Info), &mmap_bytes[info_begin]));
info.* = .{
.mmap_ptr = @ptrToInt(mmap_bytes.ptr),
.mmap_len = mmap_bytes.len,
.parameter = parameter,
.handle = undefined,
};
var user_desc: switch (std.builtin.arch) {
.i386 => std.os.linux.user_desc,
else => void,
} = undefined;
var tls_ptr = std.os.linux.tls.prepareTLS(mmap_bytes[tls_begin..]);
if (std.builtin.arch == .i386) {
defer tls_ptr = @ptrToInt(&user_desc);
user_desc = .{
.entry_number = std.os.linux.tls.tls_image.gdt_entry_number,
.base_addr = tls_ptr,
.limit = 0xfffff,
.seg_32bit = 1,
.contents = 0,
.read_exec_only = 0,
.limit_in_pages = 1,
.seg_not_present = 0,
.useable = 1,
};
}
const flags: u32 =
std.os.CLONE_SIGHAND | std.os.CLONE_SYSVSEM |
std.os.CLONE_VM | std.os.CLONE_FS | std.os.CLONE_FILES |
std.os.CLONE_PARENT_SETTID | std.os.CLONE_CHILD_CLEARTID |
std.os.CLONE_THREAD | std.os.CLONE_DETACHED | std.os.CLONE_SETTLS;
const Wrapper = struct {
fn entry(raw_arg: usize) callconv(.C) u8 {
const info_ptr = @intToPtr(*Info, raw_arg);
entryFn(info_ptr.parameter);
return 0;
}
};
const rc = std.os.linux.clone(
Wrapper.entry,
@ptrToInt(&mmap_bytes[stack_end]),
flags,
@ptrToInt(info),
&info.handle,
tls_ptr,
&info.handle,
);
return switch (std.os.linux.getErrno(rc)) {
0 => LinuxThread{ .info = info },
else => error.SpawnError,
};
}
pub fn join(self: LinuxThread) void {
while (true) {
const tid = @atomicLoad(i32, &self.info.handle, .SeqCst);
if (tid == 0) {
std.os.munmap(@intToPtr([*]align(std.mem.page_size) u8, self.info.mmap_ptr)[0..self.info.mmap_len]);
return;
}
const rc = std.os.linux.futex_wait(&self.info.handle, std.os.linux.FUTEX_WAIT, tid, null);
switch (std.os.linux.getErrno(rc)) {
0 => continue,
std.os.EINTR => continue,
std.os.EAGAIN => continue,
else => unreachable,
}
}
}
};
};
// Event
pub usingnamespace struct {
pub const Event = if (is_windows)
WindowsEvent
else if (is_darwin)
DarwinEvent
else if (is_linux)
LinuxEvent
else if (is_posix)
PosixEvent
else
@compileError("System unsupported");
const WindowsEvent = struct {
pub fn init(self: *WindowsEvent) void {
@compileError("TODO");
}
pub fn deinit(self: *WindowsEvent) void {
@compileError("TODO");
}
pub fn wait(self: *WindowsEvent, timeout: ?u64) void {
@compileError("TODO");
}
pub fn set(self: *WindowsEvent) void {
@compileError("TODO");
}
pub fn reset(self: *WindowsEvent) void {
@compileError("TODO");
}
};
const LinuxEvent = struct {
pub fn init(self: *LinuxEvent) void {
@compileError("TODO");
}
pub fn deinit(self: *LinuxEvent) void {
@compileError("TODO");
}
pub fn wait(self: *LinuxEvent, timeout: ?u64) void {
@compileError("TODO");
}
pub fn set(self: *LinuxEvent) void {
@compileError("TODO");
}
pub fn reset(self: *LinuxEvent) void {
@compileError("TODO");
}
};
const DarwinEvent = struct {
// pthread_key -> union(enum){ dispatch_semaphore_t, PosixResetEvent }
pub fn init(self: *DarwinEvent) void {
@compileError("TODO");
}
pub fn deinit(self: *DarwinEvent) void {
@compileError("TODO");
}
pub fn wait(self: *DarwinEvent, timeout: ?u64) void {
@compileError("TODO");
}
pub fn set(self: *DarwinEvent) void {
@compileError("TODO");
}
pub fn reset(self: *DarwinEvent) void {
@compileError("TODO");
}
};
const PosixEvent = struct {
pub fn init(self: *PosixEvent) void {
}
pub fn deinit(self: *PosixEvent) void {
}
pub fn wait(self: *PosixEvent, timeout: ?u64) void {
}
pub fn set(self: *PosixEvent) void {
}
pub fn reset(self: *PosixEvent) void {
}
};
};
};
const Node = struct {
next: ?*Node,
pub const Batch = struct {
};
pub const GlobalQueue = struct {
stack: usize = 0,
const IS_POPPING: usize = 1 << 0;
pub fn push(self: *GlobalQueue, batch: Batch) void {
if (batch.len == 0) {
return;
}
var stack = @atomicLoad(usize, &self.stack, .Monotonic);
while (true) : (std.Thread.spinLoopHint()) {
batch.tail.next = @intToPtr(?*Node, stack & ~IS_POPPING);
stack = @cmpxchgWeak(
?*Node,
&self.stack,
stack,
@ptrToInt(batch.head) | (stack & IS_POPPING),
.Release,
.Monotonic,
) orelse return;
}
}
};
/// https://fzn.fr/readings/ppopp13.pdf
pub const LocalQueue = struct {
head: Index = 0,
tail: Index = 0,
overflow: ?*Node = null,
buffer: [capacity]*Node = undefined,
const Index = u8;
const capacity = 128;
comptime {
std.debug.assert(std.math.maxInt(Index) >= capacity);
}
const is_x86 = switch (std.builtin.arch) {
.i386, .x86_64 => true,
else => false,
};
pub fn push(self: *LocalQueue, _batch: Batch) void {
var batch = _batch;
if (batch.len == 0) {
return;
}
push_buffer: {
if (batch.len >= self.buffer.len) {
break :push_buffer;
}
var tail = self.tail;
var head = @atomicLoad(Index, &self.head, .Monotonic);
while (true) : (std.Thread.spinLoopHint()) {
const occupied = tail -% head;
if (batch.len <= (self.buffer.len - occupied)) {
while (batch.pop()) |node| {
@atomicStore(*Node, &self.buffer[tail % self.buffer.len], node, .Unordered);
tail +%= 1;
}
@atomicStore(Index, &self.tail, tail, .Release);
return;
}
if (occupied <= (self.buffer.len / 2)) {
break :push_buffer;
}
const new_head = head +% (occupied / 2);
if (@cmpxchgWeak(Index, &self.head, head, new_head, .Acquire, .Monotonic)) |updated| {
head = updated;
continue;
}
while (head != new_head) : (head +%= 1) {
const node = self.buffer[head % self.buffer.len];
batch.push(.{ .lifo = Batch.from(node) });
}
break :push_buffer;
}
}
var overflow = @atomicLoad(?*Node, &self.overflow, .Monotonic);
while (true) : (std.Thread.spinLoopHint()) {
batch.tail.next = overflow;
if (overflow == null) {
@atomicStore(?*Node, &self.overflow, batch.head, .Release);
return;
}
overflow = @cmpxchgWeak(
?*Node,
&self.overflow,
overflow,
batch.head,
.Release,
.Monotonic,
) orelse return;
}
}
pub const Pop = struct {
node: *Node,
pushed: bool = false,
};
pub fn pop(self: *LocalQueue, target: anytype) ?Pop {
switch (@TypeOf(target)) {
*LocalQueue => {
if (self == target) {
return self.popLocal();
} else {
return self.popAndStealLocal(target);
}
},
*GlobalQueue => return self.popAndStealGlobal(target),
else => |t| @compileError("Cannot pop() from " ++ @typeName(t)),
}
}
fn popLocal(self: *LocalQueue) ?Pop {
var tail = self.tail;
var head = @atomicLoad(Index, &self.head, .Monotonic);
while (head != tail) : (std.Thread.spinLoopHint()) {
if (is_x86) {
const new_tail = tail -% 1;
_ = @atomicRmw(Index, &self.tail, new_tail, .SeqCst);
head = @atomicLoad(Index, &self.head, .Monotonic);
if (head != tail) {
const node = self.buffer[new_tail % self.buffer.len];
if (head != new_tail) {
return Pop{ .node = node };
}
if (@cmpxchgStrong(Index, &self.head, head, tail, .SeqCst, .Monotonic) == null) {
return Pop{ .node = node };
}
}
@atomicStore(Index, &self.tail, tail, .Monotonic);
break;
} else {
head = @cmpxchgWeak(Index, &self.head, head, head +% 1, .Monotonic, .Monotonic) orelse {
const node = self.buffer[head % self.buffer.len];
return Pop{ .node = node };
};
}
}
var overflow = @atomicLoad(?*Node, &self.overflow, .Monotonic);
if (overflow != null) {
overflow = switch (is_x86) {
true => @atomicRmw(?*Node, &self.overflow, .Xchg, null, .Monotonic),
else => @cmpxchgStrong(?*Node, &self.overflow, overflow, null, .Monotonic, .Monotonic) orelse overflow,
};
}
const node = overflow orelse return null;
overflow = node.next orelse return Pop{ .node = node };
var slots = self.buffer.len - (tail -% head);
while (slots > 0) : (slots += 1) {
const overflow_node = overflow orelse break;
overflow = overflow_node.next;
@atomicStore(*Node, &self.buffer[tail % self.buffer.len], overflow_node, .Unordered);
tail +%= 1;
}
@atomicStore(Index, &self.tail, tail, .Release);
if (overflow != null) {
@atomicStore(?*Node, &self.overflow, overflow, .Release);
}
return Pop{
.node = node,
.pushed = true,
};
}
fn popAndStealLocal(noalias self: *LocalQueue, noalias target: *LocalQueue) ?Pop {
@setCold(true);
const tail = self.tail;
var target_head = @atomicLoad(Index, &target.head, .Acquire);
while (true) {
const target_tail = @atomicLoad(Index, &target.tail, .Acquire);
const target_size = target_tail -% target_head;
if (target_size == 0 or target_size == std.math.maxInt(Index)) {
var node = @atomicLoad(?*Node, &target.overflow, .Monotonic) orelse return null;
node = switch (is_x86) {
true => @atomicRmw(?*Node, &target.overflow, .Xchg, null, .Acquire) orelse return null,
else => blk: {
_ = @cmpxchgWeak(?*Node, &target.overflow, node, null, .Acquire, .Monotonic) orelse break :blk node;
std.Thread.spinLoopHint();
target_head = @atomicLoad(Index, &target.head, .Acquire);
continue;
},
};
var overflow: ?*Node = node.next orelse return Pop{ .node = node };
var new_tail = tail;
const head = @atomicLoad(Index, &self.head, .Monotonic);
while (new_tail -% head < self.buffer.len) {
const overflow_node = overflow orelse break;
overflow = overflow_node.next;
@atomicStore(*Node, &self.buffer[new_tail % self.buffer.len], overflow_node, .Unordered);
new_tail +%= 1;
}
@atomicStore(Index, &self.tail, new_tail, .Release);
if (overflow != null) {
@atomicStore(?*Node, &self.overflow, overflow, .Release);
}
return Pop{
.node = node,
.pushed = true,
};
}
const head = @atomicLoad(Index, &self.head, .Unordered);
const unonccupied = self.buffer.len - (tail -% head);
const target_steal = std.math.min(
unonccupied + 1,
target_size - (target_size / 2),
);
std.debug.assert(target_steal >= 1);
if (target_steal > (target.buffer.len / 2)) {
std.Thread.spinLoopHint();
target_head = @atomicLoad(Index, &target.head, .Acquire);
continue;
}
const node = @atomicLoad(*Node, &target.buffer[target_head % target.buffer.len], .Unordered);
if (target_steal > 1) {
var buffer_steal: Index = 1;
while (buffer_steal < target_steal) : (buffer_steal += 1) {
const store_index = (tail +% buffer_steal) % self.buffer.len;
const steal_index = (target_head +% buffer_steal) % target.buffer.len;
const buffer_node = @atomicLoad(*Node, &target.buffer[steal_index], .Unordered);
@atomicStore(*Node, &self.buffer[store_index], buffer_node, .Unordered);
}
}
if (@cmpxchgWeak(
Index,
&target_head,
target_head,
target_head +% target_steal,
.AcqRel,
.Acquire,
)) |updated| {
std.Thread.spinLoopHint();
target_head = updated;
continue;
}
const new_tail = tail +% (target_steal - 1);
if (tail != new_tail) {
@atomicStore(Index, &self.tail, new_tail, .Release);
}
return Pop{
.node = node,
.pushed = tail != new_tail,
};
}
}
fn popAndStealGlobal(noalias self: *LocalQueue, noalias target: *GlobalQueue) ?Pop {
var overflow = @atomicLoad(?*Node, &self.overflow, .Monotonic);
var node: ?*Node = null;
var tail = self.tail;
var new_tail = tail;
var head = @atomicLoad(Index, &self.head, .Unordered);
while (true) : (std.Thread.spinLoopHint()) {
const occupied = new_tail -% head;
if (node != null and occupied == self.buffer.len) {
head = @atomicLoad(Index, &self.head, .Monotonic);
if ((new_tail -% head) == occupied) {
break;
}
}
const consumer_node = consumer.pop() orelse break;
if (node == null) {
node = consumer_node;
} else {
@atomicStore(*Node, &self.buffer[new_tail % self.buffer.len], consumer_node, .Unordered);
new_tail +%= 1;
}
}
return Pop{
.node = node,
.pushed = new_tail != tail,
};
}
};
};
const std = @import("std");
const Loop = @This();
counter: usize = 0,
stack_size: usize,
max_workers: usize,
idle_lock: Lock = .{},
idle_workers: ?*Worker = null,
spawned_workers: ?*Worker = null,
run_queues: [Priority.ARRAY_SIZE]GlobalQueue = [_]GlobalQueue{.{}} ** Priority.ARRAY_SIZE,
pub const RunConfig = struct {
max_threads: ?usize = null,
stack_size: ?usize = null,
};
fn ReturnTypeOf(comptime asyncFn: anytype) type {
return @typeInfo(@TypeOf(asyncFn)).Fn.return_type.?;
}
pub fn run(config: RunConfig, comptime asyncFn: anytype, args: anytype) !ReturnTypeOf(asyncFn) {
const Args = @TypeOf(args);
const Result = ReturnTypeOf(asyncFn);
const Wrapper = struct {
fn entry(task: *Task, result: *?Result, arguments: Args) void {
suspend task.* = Task.init(@frame());
const res = @call(.{}, asyncFn, arguments);
result.* = res; // TODO: check if this hack is still needed
suspend Worker.getCurrent().?.loop.shutdown();
}
};
var task: Task = undefined;
var result: ?Result = null;
var frame = async Wrapper.entry(&task, &result, args);
runTask(config, &task);
return result orelse error.AsyncFnDidNotComplete;
}
pub fn runTask(config: RunConfig, task: *Task) void {
if (std.builtin.single_threaded)
@compileError("TODO: specialize for single_threaded");
const stack_size = blk: {
const stack_size = config.stack_size orelse 16 * 1024 * 1024;
break :blk std.math.max(std.mem.page_size, stack_size);
};
const max_workers = blk: {
const max_threads = config.max_threads orelse std.Thread.cpuCount() catch 1;
break :blk std.math.max(1, max_threads);
};
var loop = Loop{
.max_workers = max_workers,
.stack_size = stack_size,
};
defer loop.idle_lock.deinit();
loop.run_queues[0].push(Batch.from(task));
loop.counter = (Counter{
.state = .waking,
.notified = false,
.idle = 0,
.spawned = 1,
}).pack();
Worker.run(&loop, null);
}
pub const Task = struct {
next: ?*Task = undefined,
data: usize,
pub fn init(frame: anyframe) Task {
return .{ .data = @ptrToInt(frame) | 0 };
}
pub const Callback = fn(*Task) callconv(.C) void;
pub fn initCallback(callback: Callback) void {
return .{ .data = @ptrToInt(callback) | 1 };
}
fn run(self: *Task) void {
switch (self.data & 1) {
0 => resume @intToPtr(anyframe, self.data),
1 => @intToPtr(Callback, self.data & ~@as(usize, 1))(self),
else => unreachable,
}
}
};
pub const Priority = enum(u2) {
Low = 0,
Normal = 1,
High = 2,
Handoff = 3,
const ARRAY_SIZE = 3;
fn toArrayIndex(self: Priority) usize {
return switch (self) {
.Handoff, .High => 2,
.Normal => 1,
.Low => 0,
};
}
};
pub fn scheduleRemote(self: *Loop, task: *Task, priority: Priority) void {
self.run_queues[priority.toArrayIndex()].push(Batch.from(task));
self.notifyWorkersWith(null);
}
pub fn schedule(task: *Task, priority: Priority) void {
const worker = Worker.getCurrent() orelse @panic("Loop.schedule called outside of worker thread pool");
worker.schedule(Batch.from(task), priority);
}
const Counter = struct {
state: State = .pending,
notified: bool = false,
idle: usize = 0,
spawned: usize = 0,
const count_bits = @divFloor(std.meta.bitCount(usize) - 3, 2);
const Count = std.meta.Int(.unsigned, count_bits);
const State = enum(u2) {
pending = 0,
waking,
signaled,
shutdown,
};
fn pack(self: Counter) usize {
return (
(@as(usize, @enumToInt(self.state)) << 0) |
(@as(usize, @boolToInt(self.notified)) << 2) |
(@as(usize, @intCast(Count, self.idle)) << 3) |
(@as(usize, @intCast(Count, self.spawned)) << (3 + count_bits))
);
}
fn unpack(value: usize) Counter {
return .{
.state = @intToEnum(State, @truncate(u2, value)),
.notified = value & (1 << 2) != 0,
.idle = @truncate(Count, value >> 3),
.spawned = @truncate(Count, value >> (3 + count_bits)),
};
}
};
fn getWorkerCount(self: *const Loop) usize {
const counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic));
return counter.spawned;
}
fn getWorkerIter(self: *const Loop) WorkerIter {
const spawned_workers = @atomicLoad(?*Worker, &self.spawned_workers, .Acquire);
return .{ .spawned = spawned_workers };
}
const WorkerIter = struct {
spawned: ?*Worker = null,
fn next(self: *WorkerIter) ?*Worker {
const worker = self.spawned orelse return null;
self.spawned = worker.spawned_next;
return worker;
}
};
fn beginWorkerWith(self: *Loop, worker: *Worker) void {
var spawned_workers = @atomicLoad(?*Worker, &self.spawned_workers, .Monotonic);
while (true) {
worker.spawned_next = spawned_workers;
spawned_workers = @cmpxchgWeak(
?*Worker,
&self.spawned_workers,
spawned_workers,
worker,
.Release,
.Monotonic,
) orelse break;
}
}
fn notifyWorkersWith(self: *Loop, worker: ?*Worker) void {
var did_spawn = false;
var spawn_attempts_remaining: u8 = 5;
var is_waking = if (worker) |w| w.state == .waking else false;
var counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic));
while (true) {
if (counter.state == .shutdown) {
if (did_spawn)
self.markShutdown();
return;
}
var is_spawning = false;
var is_signaling = false;
var new_counter = counter;
if (is_waking) {
std.debug.assert(counter.state == .waking);
if (counter.idle > 0) {
is_signaling = true;
new_counter.state = .signaled;
if (did_spawn)
new_counter.spawned -= 1;
} else if (spawn_attempts_remaining > 0 and (did_spawn or counter.spawned < self.max_workers)) {
is_spawning = true;
if (!did_spawn)
new_counter.spawned += 1;
} else {
new_counter.notified = true;
new_counter.state = .pending;
if (did_spawn)
new_counter.spawned -= 1;
}
} else {
if (counter.state == .pending and counter.idle > 0) {
is_signaling = true;
new_counter.state = .signaled;
} else if (counter.state == .pending and counter.spawned < self.max_workers) {
is_spawning = true;
new_counter.state = .waking;
new_counter.spawned += 1;
} else if (!counter.notified) {
new_counter.notified = true;
} else {
return;
}
}
if (@cmpxchgWeak(
usize,
&self.counter,
counter.pack(),
new_counter.pack(),
.Release,
.Monotonic,
)) |updated| {
std.Thread.spinLoopHint();
counter = Counter.unpack(updated);
continue;
}
is_waking = true;
if (is_signaling) {
self.notifyOne();
return;
}
if (is_spawning) {
did_spawn = true;
if (Worker.spawn(self))
return;
} else {
return;
}
std.Thread.spinLoopHint();
spawn_attempts_remaining -= 1;
counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic));
}
}
fn suspendWorkerWith(self: *Loop, worker: *Worker) void {
var counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic));
while (true) {
if (counter.state == .shutdown) {
worker.state = .shutdown;
self.markShutdown();
return;
}
if (counter.notified or counter.state == .signaled or worker.state != .waiting) {
var new_counter = counter;
new_counter.notified = false;
if (counter.state == .signaled) {
new_counter.state = .waking;
if (worker.state == .waiting)
new_counter.idle -= 1;
} else if (counter.notified) {
if (worker.state == .waking)
new_counter.state = .waking;
if (worker.state == .waiting)
new_counter.idle -= 1;
} else {
if (worker.state == .waking)
new_counter.state = .pending;
if (worker.state != .waiting)
new_counter.idle += 1;
}
if (@cmpxchgWeak(
usize,
&self.counter,
counter.pack(),
new_counter.pack(),
.Acquire,
.Monotonic,
)) |updated| {
std.Thread.spinLoopHint();
counter = Counter.unpack(updated);
continue;
}
if (counter.notified or counter.state == .signaled) {
if (counter.state == .signaled) {
worker.state = .waking;
} else if (worker.state == .waiting) {
worker.state = .running;
}
return;
}
}
worker.state = .waiting;
self.wait(worker);
counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic));
}
}
fn shutdown(self: *Loop) void {
var counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic));
while (counter.state != .shutdown) {
var new_counter = counter;
new_counter.state = .shutdown;
new_counter.idle = 0;
if (@cmpxchgWeak(
usize,
&self.counter,
counter.pack(),
new_counter.pack(),
.AcqRel,
.Monotonic,
)) |updated| {
std.Thread.spinLoopHint();
counter = Counter.unpack(updated);
continue;
}
if (counter.idle > 0)
self.notifyAll();
return;
}
}
fn markShutdown(self: *Loop) void {
var counter = Counter{ .spawned = 1 };
counter = Counter.unpack(@atomicRmw(usize, &self.counter, .Sub, counter.pack(), .AcqRel));
std.debug.assert(counter.state == .shutdown);
if (counter.spawned == 1 and counter.idle != 0) {
self.notifyOne();
}
}
fn joinWith(self: *Loop, worker: *Worker) void {
var counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Acquire));
while (true) {
if (counter.spawned == 0)
break;
if (counter.idle == 0) {
var new_counter = counter;
new_counter.idle = 1;
if (@cmpxchgWeak(
usize,
&self.counter,
counter.pack(),
new_counter.pack(),
.Acquire,
.Acquire,
)) |updated| {
std.Thread.spinLoopHint();
counter = Counter.unpack(updated);
continue;
}
}
self.wait(worker);
counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Acquire));
}
var pending_workers = self.getWorkerIter();
while (pending_workers.next()) |pending_worker| {
const thread = pending_worker.thread orelse continue;
pending_worker.notify();
thread.join();
}
}
fn wait(self: *Loop, worker: *Worker) void {
const is_waiting = blk: {
const held = self.idle_lock.acquire();
defer held.release();
const counter = Counter.unpack(@atomicLoad(usize, &self.counter, .Monotonic));
const should_wake = switch (worker.state) {
.shutdown => counter.spawned == 0,
.waiting => counter.notified or counter.state == .signaled or counter.state == .shutdown,
else => unreachable,
};
if (should_wake) {
break :blk false;
}
worker.idle_next = self.idle_workers;
self.idle_workers = worker;
break :blk true;
};
if (is_waiting) {
worker.wait();
}
}
fn notifyOne(self: *Loop) void {
self.wake(false);
}
fn notifyAll(self: *Loop) void {
self.wake(true);
}
fn wake(self: *Loop, wake_all: bool) void {
const worker = blk: {
const held = self.idle_lock.acquire();
defer held.release();
const worker = self.idle_workers orelse {
return;
};
if (wake_all) {
self.idle_workers = null;
} else {
self.idle_workers = worker.idle_next;
worker.idle_next = null;
}
break :blk worker;
};
var idle_workers: ?*Worker = worker;
while (idle_workers) |idle_worker| {
idle_workers = idle_worker.idle_next;
idle_worker.notify();
}
}
const Worker = struct {
loop: *Loop,
thread: ?Thread,
state: State = .waking,
wait_state: usize = 0,
idle_next: ?*Worker = null,
spawned_next: ?*Worker = null,
steal_targets: WorkerIter = .{},
run_next: Batch = .{},
run_queues: [Priority.ARRAY_SIZE]LocalQueue = [_]LocalQueue{.{}} ** Priority.ARRAY_SIZE,
const ThreadLocalWorkerPtr = ThreadLocalUsize(struct{});
const State = enum {
running,
waking,
waiting,
shutdown,
};
fn spawn(loop: *Loop) bool {
const Spawner = struct {
_loop: *Loop = undefined,
_thread: Thread = undefined,
put_event: Event = .{},
got_event: Event = .{},
fn entry(self: *@This()) void {
std.debug.assert(self.put_event.wait(null));
const _loop = self._loop;
const _thread = self._thread;
self.got_event.set();
Worker.run(_loop, _thread);
}
};
var spawner = Spawner{};
defer {
spawner.put_event.deinit();
spawner.got_event.deinit();
}
spawner._loop = loop;
spawner._thread = Thread.spawn(loop.stack_size, &spawner, Spawner.entry) catch return false;
spawner.put_event.set();
std.debug.assert(spawner.got_event.wait(null));
return true;
}
fn run(loop: *Loop, thread: ?Thread) void {
var self = Worker{
.loop = loop,
.thread = thread,
};
loop.beginWorkerWith(&self);
ThreadLocalWorkerPtr.set(@ptrToInt(&self));
var tick = @truncate(u8, @ptrToInt(&self) >> @sizeOf(*Worker));
while (self.state != .shutdown) {
tick +%= 1;
if (self.poll(tick)) |task| {
if (self.state == .waking)
loop.notifyWorkersWith(&self);
self.state = .running;
task.run();
continue;
}
loop.suspendWorkerWith(&self);
}
if (thread == null) {
loop.joinWith(&self);
} else {
self.wait();
}
}
fn getCurrent() ?*Worker {
return @intToPtr(?*Worker, ThreadLocalWorkerPtr.get());
}
const WaitState = enum(u2) {
Empty = 0,
Waiting = 1,
Notified = 3,
};
fn wait(self: *Worker) void {
var event align(std.math.max(@alignOf(Event), 4)) = Event{};
defer event.deinit();
if (@cmpxchgStrong(
usize,
&self.wait_state,
@enumToInt(WaitState.Empty),
@enumToInt(WaitState.Waiting) | @ptrToInt(&event),
.AcqRel,
.Acquire,
)) |updated| {
std.debug.assert(@intToEnum(WaitState, @truncate(u2, updated)) == .Notified);
@atomicStore(usize, &self.wait_state, @enumToInt(WaitState.Empty), .Monotonic);
return;
}
const timeout: ?u64 = null;
const timed_out = !event.wait(timeout);
std.debug.assert(!timed_out);
}
fn notify(self: *Worker) void {
var wait_state = @atomicLoad(usize, &self.wait_state, .Monotonic);
while (true) {
var new_state: WaitState = undefined;
switch (@intToEnum(WaitState, @truncate(u2, wait_state))) {
.Empty => new_state = .Notified,
.Waiting => new_state = .Empty,
.Notified => return,
}
if (@cmpxchgWeak(
usize,
&self.wait_state,
wait_state,
@enumToInt(new_state),
.AcqRel,
.Monotonic,
)) |updated| {
std.Thread.spinLoopHint();
wait_state = updated;
continue;
}
if (new_state == .Empty) {
const EventPtr = *align(std.math.max(@alignOf(Event), 4)) Event;
const event = @intToPtr(EventPtr, wait_state & ~@as(usize, 0b11));
event.set();
}
return;
}
}
fn schedule(self: *Worker, batch: Batch, priority: Priority) void {
if (batch.isEmpty())
return;
if (priority == .Handoff) {
self.run_next.push(batch);
} else {
self.run_queues[priority.toArrayIndex()].push(batch);
}
self.loop.notifyWorkersWith(null);
}
fn poll(self: *Worker, tick: u8) ?*Task {
if (tick % 61 == 0) {
if (self.pollGlobal(null)) |task|
return task;
}
var steal_attempt: u8 = 0;
while (steal_attempt < 4) : (steal_attempt += 1) {
if (self.pollLocal(false)) |task|
return task;
if (self.pollGlobal(steal_attempt)) |task|
return task;
if (self.pollSteal(steal_attempt)) |task|
return task;
}
return null;
}
fn pollLocal(self: *Worker, be_fair: bool) ?*Task {
var priority_order = [_]Priority{ .Handoff, .High, .Normal, .Low };
if (be_fair) {
priority_order = [_]Priority{ .Low, .Normal, .High, .Handoff };
}
for (priority_order) |priority| {
if (switch (priority) {
.Handoff => self.run_next.pop(),
else => self.run_queues[priority.toArrayIndex()].pop(be_fair)
}) |task| {
return task;
}
}
return null;
}
fn pollGlobal(self: *Worker, steal_attempt: ?u8) ?*Task {
return self.pollQueues(&self.loop.run_queues, "popAndStealGlobal", steal_attempt);
}
fn pollSteal(self: *Worker, steal_attempt: ?u8) ?*Task {
var iter = self.loop.getWorkerCount();
while (iter > 0) : (iter -= 1) {
const target = self.steal_targets.next() orelse blk: {
self.steal_targets = self.loop.getWorkerIter();
break :blk self.steal_targets.next() orelse unreachable;
};
if (target == self)
continue;
if (self.pollQueues(&target.run_queues, "popAndStealLocal", steal_attempt)) |task|
return task;
}
return null;
}
fn pollQueues(
self: *Worker,
target_queues: anytype,
comptime popAndStealFn: []const u8,
steal_attempt: ?u8,
) ?*Task {
const priority_order: []const Priority = blk: {
const attempt = steal_attempt orelse {
break :blk &[_]Priority{ .Low, .Normal, .High };
};
break :blk switch (attempt) {
0 => &[_]Priority{ .High },
1 => &[_]Priority{ .High, .Normal },
else => &[_]Priority{ .High, .Normal, .Low },
};
};
for (priority_order) |priority| {
const be_fair = steal_attempt == null;
const local_queue = &self.run_queues[priority.toArrayIndex()];
const target_queue = &target_queues[priority.toArrayIndex()];
if (@field(local_queue, popAndStealFn)(be_fair, target_queue)) |task|
return task;
}
return null;
}
};
fn ThreadLocalUsize(comptime UniqueKey: type) type {
const is_apple_silicon = std.Target.current.isDarwin() and std.builtin.arch == .aarch64;
// For normal platforms, we use the compilers built in "threadlocal" keyword.
if (!is_apple_silicon) {
return struct {
threadlocal var tls_value: usize = 0;
pub fn get() usize {
return tls_value;
}
pub fn set(value: usize) void {
tls_value = value;
}
};
}
// For Apple Silicon, LLD currently has some issues with it which prevents the threadlocal keyword from work correctly.
// So for now we fallback to the OS provided thread local mechanics.
return struct {
const pthread_key_t = c_ulong;
const pthread_once_t = extern struct {
__sig: c_long = 0x30B1BCBA,
__opaque: [4]u8 = [_]u8{ 0, 0, 0, 0 },
};
extern "c" fn pthread_once(o: *pthread_once_t, f: ?fn() callconv(.C) void) callconv(.C) c_int;
extern "c" fn pthread_key_create(k: *pthread_key_t, d: ?fn(?*c_void) callconv(.C) void) callconv(.C) c_int;
extern "c" fn pthread_setspecific(k: pthread_key_t, p: ?*c_void) callconv(.C) c_int;
extern "c" fn pthread_getspecific(k: pthread_key_t) callconv(.C) ?*c_void;
var tls_key: pthread_key_t = undefined;
var tls_key_once: pthread_once_t = .{};
fn tls_init() callconv(.C) void {
std.debug.assert(pthread_key_create(&tls_key, null) == 0);
}
pub fn get() usize {
std.debug.assert(pthread_once(&tls_key_once, tls_init) == 0);
return @ptrToInt(pthread_getspecific(tls_key));
}
pub fn set(value: usize) void {
std.debug.assert(pthread_once(&tls_key_once, tls_init) == 0);
std.debug.assert(pthread_setspecific(tls_key, @intToPtr(?*c_void, value)) == 0);
}
};
}
const Batch = struct {
head: ?*Task = null,
tail: *Task = undefined,
fn from(task: *Task) Batch {
task.next = null;
return Batch{
.head = task,
.tail = task,
};
}
fn isEmpty(self: Batch) bool {
return self.head == null;
}
fn push(self: *Batch, batch: Batch) void {
if (self.isEmpty()) {
self.* = batch;
} else if (!batch.isEmpty()) {
self.tail.next = batch.head;
self.tail = batch.tail;
}
}
fn pushFront(self: *Batch, batch: Batch) void {
if (self.isEmpty()) {
self.* = batch;
} else if (!batch.isEmpty()) {
batch.tail.next = self.head;
self.head = batch.head;
}
}
fn pop(self: *Batch) ?*Task {
const task = self.head orelse return null;
self.head = task.next;
return task;
}
};
const GlobalQueue = GlobalQueue6;
const LocalQueue = LocalQueue6;
const GlobalQueue6 = struct {
stack: ?*Task = null,
pub fn push(self: *GlobalQueue, batch: Batch) void {
if (batch.isEmpty()) {
return;
}
batch.tail.next = @atomicLoad(?*Task, &self.stack, .Monotonic);
while (true) : (std.Thread.spinLoopHint()) {
batch.tail.next = @cmpxchgWeak(
?*Task,
&self.stack,
batch.tail.next,
batch.head,
.Release,
.Monotonic,
) orelse return;
}
}
};
const LocalQueue6 = struct {
head: usize = 0,
tail: usize = 0,
overflow: GlobalQueue = .{},
buffer: [128]*Task = undefined,
fn push(self: *LocalQueue, _batch: Batch) void {
var batch = _batch;
if (batch.isEmpty()) {
return;
}
var tail = self.tail;
var head = @atomicLoad(usize, &self.head, .Monotonic);
while (true) {
if (tail -% head < self.buffer.len) {
while (tail -% head < self.buffer.len) {
const task = batch.pop() orelse break;
@atomicStore(*Task, &self.buffer[tail % self.buffer.len], task, .Unordered);
tail +%= 1;
}
@atomicStore(usize, &self.tail, tail, .Release);
if (batch.isEmpty())
return;
std.Thread.spinLoopHint();
head = @atomicLoad(usize, &self.head, .Monotonic);
continue;
}
const new_head = head +% @intCast(usize, self.buffer.len / 2);
if (@cmpxchgWeak(
usize,
&self.head,
head,
new_head,
.Acquire,
.Monotonic,
)) |updated| {
std.Thread.spinLoopHint();
head = updated;
continue;
}
var overflow = Batch{};
while (head != new_head) : (head +%= 1) {
const task = self.buffer[head % self.buffer.len];
overflow.push(Batch.from(task));
}
overflow.push(batch);
batch = overflow;
batch.tail.next = @atomicLoad(?*Task, &self.overflow.stack, .Monotonic);
while (true) : (std.Thread.spinLoopHint()) {
if (batch.tail.next == null) {
@atomicStore(?*Task, &self.overflow.stack, batch.head, .Release);
return;
}
batch.tail.next = @cmpxchgWeak(
?*Task,
&self.overflow.stack,
batch.tail.next,
batch.head,
.Release,
.Monotonic,
) orelse return;
}
}
}
fn popLocal(self: *LocalQueue) ?*Task {
const tail = self.tail;
if (@atomicLoad(usize, &self.head, .Monotonic) == tail)
return null;
@atomicStore(usize, &self.tail, tail -% 1, .SeqCst);
const head = @atomicLoad(usize, &self.head, .SeqCst);
if (tail == head) {
@atomicStore(usize, &self.tail, tail, .Release);
return null;
}
var task: ?*Task = self.buffer[(tail -% 1) % self.buffer.len];
if (head != (tail -% 1)) {
return task;
}
if (@cmpxchgStrong(usize, &self.head, head, head +% 1, .AcqRel, .Monotonic) != null)
task = null;
@atomicStore(usize, &self.tail, tail, .Release);
return task;
}
fn popStealLocal(target: *LocalQueue) ?*Task {
while (true) : (std.Thread.spinLoopHint()) {
const target_head = @atomicLoad(usize, &target.head, .Acquire);
const target_tail = @atomicLoad(usize, &target.tail, .Acquire);
if (target_tail == target_head)
return null;
if (target_tail == target_head -% 1)
return null;
if (target_tail -% target_head > target.buffer.len)
return null;
const task = @atomicLoad(*Task, &target.buffer[target_head % target.buffer.len], .Unordered);
_ = @cmpxchgWeak(
usize,
&target.head,
target_head,
target_head +% 1,
.AcqRel,
.Monotonic,
) orelse {
return task;
};
}
}
fn popStealGlobal(self: *LocalQueue, target: *GlobalQueue) ?*Task {
if (@atomicLoad(?*Task, &target.stack, .Monotonic) == null)
return null;
const task = @atomicRmw(?*Task, &target.stack, .Xchg, null, .Acquire) orelse return null;
var overflow = task.next;
if (overflow != null) {
var tail = self.tail;
var head = @atomicLoad(usize, &self.head, .Monotonic);
if (tail != head)
std.debug.panic("tail != head on steal (t={} h={} d={})\n", .{tail, head, tail -% head});
var i: usize = self.buffer.len;
while (i != 0) : (i -= 1) {
const t = overflow orelse break;
overflow = t.next;
@atomicStore(*Task, &self.buffer[tail % self.buffer.len], t, .Unordered);
tail +%= 1;
}
@atomicStore(usize, &self.tail, tail, .Release);
}
if (overflow != null) {
if (@atomicLoad(?*Task, &self.overflow.stack, .Monotonic) != null)
std.debug.panic("non null overflow stack", .{});
@atomicStore(?*Task, &self.overflow.stack, overflow, .Release);
}
return task;
}
fn pop(self: *LocalQueue, be_fair: bool) ?*Task {
if (self.popLocal()) |task|
return task;
if (self.popStealGlobal(&self.overflow)) |task|
return task;
return null;
}
fn popAndStealGlobal(self: *LocalQueue, be_fair: bool, target: *GlobalQueue) ?*Task {
return self.popStealGlobal(&self.overflow);
}
fn popAndStealLocal(self: *LocalQueue, be_fair: bool, target: *LocalQueue) ?*Task {
if (target.popStealLocal()) |task|
return task;
if (self.popStealGlobal(&target.overflow)) |task|
return task;
return null;
}
};
const GlobalQueue5 = LocalQueue;
const LocalQueue5 = struct {
stack: usize = 0,
local: ?*Task = null,
const MASK = IS_POPPING | HAS_LOCALS;
const HAS_LOCALS: usize = 0b01;
const IS_POPPING: usize = 0b10;
fn push(self: *LocalQueue, batch: Batch) void {
if (batch.isEmpty()) {
return;
}
var stack = @atomicLoad(usize, &self.stack, .Monotonic);
while (true) : (std.Thread.spinLoopHint()) {
batch.tail.next = @intToPtr(?*Task, stack & ~MASK);
stack = @cmpxchgWeak(
usize,
&self.stack,
stack,
@ptrToInt(batch.head) | (stack & MASK),
.Release,
.Monotonic,
) orelse return;
}
}
fn pop(self: *LocalQueue, be_fair: bool) ?*Task {
var stack = @atomicLoad(usize, &self.stack, .Monotonic);
while (true) : (std.Thread.spinLoopHint()) {
if (stack & ~IS_POPPING == 0)
return null;
if (stack & IS_POPPING != 0)
return null;
var new_stack = stack | IS_POPPING | HAS_LOCALS;
if (stack & HAS_LOCALS == 0) {
new_stack &= MASK;
}
stack = @cmpxchgWeak(
usize,
&self.stack,
stack,
new_stack,
.Acquire,
.Monotonic,
) orelse break;
}
while (true) {
const task = self.local orelse @intToPtr(?*Task, stack & ~MASK) orelse unreachable;
self.local = task.next;
if (self.local != null) {
_ = @atomicRmw(usize, &self.stack, .And, ~IS_POPPING, .Release);
return task;
}
stack = @atomicLoad(usize, &self.stack, .Monotonic);
while (true) : (std.Thread.spinLoopHint()) {
self.local = @intToPtr(?*Task, stack & ~MASK);
var new_stack: usize = 0;
if (self.local != null)
new_stack = HAS_LOCALS;
stack = @cmpxchgWeak(
usize,
&self.stack,
stack,
new_stack,
.Release,
.Monotonic,
) orelse break;
}
return task;
}
}
fn popAndStealGlobal(self: *LocalQueue, be_fair: bool, target: *GlobalQueue) ?*Task {
return self.popAndStealLocal(be_fair, target);
}
fn popAndStealLocal(self: *LocalQueue, be_fair: bool, target: *LocalQueue) ?*Task {
return target.pop(be_fair);
}
};
const GlobalQueue4 = LocalQueue;
const LocalQueue4 = struct {
lock: Lock = .{},
local: ?*Task = null,
contended: ?*Task = null,
has_pending: bool = false,
fn push(self: *LocalQueue, batch: Batch) void {
if (batch.isEmpty()) {
return;
}
while (true) : (std.Thread.spinLoopHint()) {
if (self.lock.tryAcquire()) |held| {
@atomicStore(bool, &self.has_pending, true, .Monotonic);
batch.tail.next = self.local;
self.local = batch.head;
held.release();
return;
}
batch.tail.next = @atomicLoad(?*Task, &self.contended, .Monotonic);
if (batch.tail.next == null) {
@atomicStore(?*Task, &self.contended, batch.head, .Release);
@atomicStore(bool, &self.has_pending, true, .Release);
return;
}
_ = @cmpxchgStrong(
?*Task,
&self.contended,
batch.tail.next,
batch.head,
.Release,
.Monotonic,
) orelse {
@atomicStore(bool, &self.has_pending, true, .Release);
return;
};
}
}
fn pop(self: *LocalQueue, be_fair: bool) ?*Task {
if (!@atomicLoad(bool, &self.has_pending, .Monotonic))
return null;
const held = self.lock.tryAcquire() orelse return null;
defer held.release();
if (!@atomicLoad(bool, &self.has_pending, .Monotonic))
return null;
while (true) {
if (self.local) |task| {
self.local = task.next;
return task;
}
if (@atomicRmw(?*Task, &self.contended, .Xchg, null, .Acquire)) |contended| {
self.local = contended;
continue;
}
if (@cmpxchgStrong(bool, &self.has_pending, true, false, .Acquire, .Monotonic) != null) {
return null;
}
}
}
fn popAndStealGlobal(self: *LocalQueue, be_fair: bool, target: *GlobalQueue) ?*Task {
return self.popAndStealLocal(be_fair, target);
}
fn popAndStealLocal(self: *LocalQueue, be_fair: bool, target: *LocalQueue) ?*Task {
return target.pop(be_fair);
}
};
const GlobalQueue3 = LocalQueue;
const LocalQueue3 = struct {
lock: Lock = .{},
batch: Batch = .{},
has_pending: bool = false,
fn push(self: *LocalQueue, batch: Batch) void {
if (batch.isEmpty()) {
return;
}
const held = self.lock.acquire();
defer held.release();
@atomicStore(bool, &self.has_pending, true, .Monotonic);
self.batch.pushFront(batch);
}
fn pop(self: *LocalQueue, be_fair: bool) ?*Task {
if (!@atomicLoad(bool, &self.has_pending, .Monotonic))
return null;
const held = self.lock.tryAcquire() orelse return null;
defer held.release();
if (!@atomicLoad(bool, &self.has_pending, .Monotonic))
return null;
return self.batch.pop() orelse {
@atomicStore(bool, &self.has_pending, false, .Monotonic);
return null;
};
}
fn popAndStealGlobal(self: *LocalQueue, be_fair: bool, target: *GlobalQueue) ?*Task {
return self.popAndStealLocal(be_fair, target);
}
fn popAndStealLocal(self: *LocalQueue, be_fair: bool, target: *LocalQueue) ?*Task {
return target.pop(be_fair);
}
};
const GlobalQueue2 = LocalQueue;
const LocalQueue2 = struct {
queue: UnboundedQueue = .{},
fn push(self: *LocalQueue, batch: Batch) void {
self.queue.push(batch);
}
fn pop(self: *LocalQueue, be_fair: bool) ?*Task {
var consumer = self.queue.tryAcquireConsumer() orelse return null;
defer consumer.release();
return consumer.pop();
}
fn popAndStealGlobal(self: *LocalQueue, be_fair: bool, target: *GlobalQueue) ?*Task {
return self.popAndStealLocal(be_fair, target);
}
fn popAndStealLocal(self: *LocalQueue, be_fair: bool, target: *LocalQueue) ?*Task {
return target.pop();
}
};
const GlobalQueue1 = UnboundedQueue;
const LocalQueue1 = struct {
buffer: BoundedQueue = .{},
overflow: UnboundedQueue = .{},
fn push(self: *LocalQueue, batch: Batch) void {
if (self.buffer.push(batch)) |overflowed|
self.overflow.push(overflowed);
}
fn pop(self: *LocalQueue, be_fair: bool) ?*Task {
if (be_fair) {
if (self.buffer.popAndStealUnbounded(&self.overflow)) |task|
return task;
}
if (self.buffer.pop()) |task|
return task;
if (self.buffer.popAndStealUnbounded(&self.overflow)) |task|
return task;
return null;
// return self.buffer.pop() orelse self.buffer.popAndStealUnbounded(&self.overflow);
// if (be_fair) {
// if (self.buffer.pop()) |task|
// return task;
// }
// if (self.buffer.popAndStealUnbounded(&self.overflow)) |task|
// return task;
// if (self.buffer.pop()) |task|
// return task;
// return null;
}
fn popAndStealGlobal(self: *LocalQueue, be_fair: bool, target: *GlobalQueue) ?*Task {
return self.buffer.popAndStealUnbounded(target);
}
fn popAndStealLocal(self: *LocalQueue, be_fair: bool, target: *LocalQueue) ?*Task {
if (self.buffer.popAndStealBounded(&target.buffer)) |task|
return task;
if (self.buffer.popAndStealUnbounded(&target.overflow)) |task|
return task;
return null;
// if (self == target)
// return self.pop(be_fair);
// if (be_fair) {
// if (self.buffer.popAndStealBounded(&target.buffer)) |task|
// return task;
// }
// if (self.buffer.popAndStealUnbounded(&target.overflow)) |task|
// return task;
// if (self.buffer.popAndStealBounded(&target.buffer)) |task|
// return task;
// return null;
}
};
const UnboundedQueue = struct {
head: ?*Task = null,
tail: usize = 0,
stub: Task = .{
.next = null,
.data = undefined,
},
fn push(self: *UnboundedQueue, batch: Batch) void {
if (batch.isEmpty())
return;
const head = @atomicRmw(?*Task, &self.head, .Xchg, batch.tail, .AcqRel);
const prev = head orelse &self.stub;
@atomicStore(?*Task, &prev.next, batch.head, .Release);
}
fn tryAcquireConsumer(self: *UnboundedQueue) ?Consumer {
// var head = @atomicLoad(?*Task, &self.head, .Monotonic);
// if (head == null or head == &self.stub)
// return null;
// var tail = @atomicRmw(usize, &self.tail, .Xchg, 1, .Acquire);
// if (tail & 1 != 0)
// return null;
while (true) : (std.Thread.spinLoopHint()) {
const head = @atomicLoad(?*Task, &self.head, .Monotonic);
if (head == null or head == &self.stub)
return null;
const tail = @atomicLoad(usize, &self.tail, .Monotonic);
if (tail & 1 != 0)
return null;
_ = @cmpxchgWeak(
usize,
&self.tail,
tail,
tail | 1,
.Acquire,
.Monotonic,
) orelse return Consumer{
.queue = self,
.tail = @intToPtr(?*Task, tail) orelse &self.stub,
};
}
// var tail = @atomicLoad(usize, &self.tail, .Monotonic);
// while (true) : (std.Thread.spinLoopHint()) {
// const head = @atomicLoad(?*Task, &self.head, .Monotonic);
// if (head == null or head == &self.stub)
// return null;
// if (tail & 1 != 0)
// return null;
// tail = @cmpxchgWeak(
// usize,
// &self.tail,
// tail,
// tail | 1,
// .Acquire,
// .Monotonic,
// ) orelse return Consumer{
// .queue = self,
// .tail = @intToPtr(?*Task, tail) orelse &self.stub,
// };
// }
}
const Consumer = struct {
queue: *UnboundedQueue,
tail: *Task,
fn release(self: Consumer) void {
@atomicStore(usize, &self.queue.tail, @ptrToInt(self.tail), .Release);
}
fn pop(self: *Consumer) ?*Task {
var tail = self.tail;
var next = @atomicLoad(?*Task, &tail.next, .Acquire);
if (tail == &self.queue.stub) {
tail = next orelse return null;
self.tail = tail;
next = @atomicLoad(?*Task, &tail.next, .Acquire);
}
if (next) |task| {
self.tail = task;
return tail;
}
const head = @atomicLoad(?*Task, &self.queue.head, .Monotonic);
if (tail != head) {
return null;
}
self.queue.push(Batch.from(&self.queue.stub));
if (@atomicLoad(?*Task, &tail.next, .Acquire)) |task| {
self.tail = task;
return tail;
}
return null;
}
};
};
const BoundedQueue = Bounded1;
const Bounded2 = struct {
head: Pos = 0,
tail: Pos = 0,
buffer: [capacity]*Task = undefined,
const Pos = std.meta.Int(.unsigned, std.meta.bitCount(usize) / 2);
const capacity = 128;
comptime {
std.debug.assert(capacity <= std.math.maxInt(Pos));
}
fn push(self: *BoundedQueue, _batch: Batch) ?Batch {
var batch = _batch;
if (batch.isEmpty()) {
return null;
}
var tail = self.tail;
var head = @atomicLoad(Pos, &self.head, .Monotonic);
while (true) {
if (batch.isEmpty())
return null;
if (tail -% head < self.buffer.len) {
while (tail -% head < self.buffer.len) {
const task = batch.pop() orelse break;
@atomicStore(*Task, &self.buffer[tail % self.buffer.len], task, .Unordered);
tail +%= 1;
}
@atomicStore(Pos, &self.tail, tail, .Release);
std.Thread.spinLoopHint();
head = @atomicLoad(Pos, &self.head, .Monotonic);
continue;
}
const new_head = head +% @intCast(Pos, self.buffer.len / 2);
if (@cmpxchgWeak(
Pos,
&self.head,
head,
new_head,
.Acquire,
.Monotonic,
)) |updated| {
std.Thread.spinLoopHint();
head = updated;
continue;
}
var overflowed = Batch{};
while (head != new_head) : (head +%= 1) {
const task = self.buffer[head % self.buffer.len];
overflowed.push(Batch.from(task));
}
overflowed.push(batch);
return overflowed;
}
}
fn pop(self: *BoundedQueue) ?*Task {
const tail = self.tail;
if (@atomicLoad(Pos, &self.head, .Monotonic) == tail)
return null;
@atomicStore(Pos, &self.tail, tail -% 1, .SeqCst);
const head = @atomicLoad(Pos, &self.head, .SeqCst);
if (tail == head) {
@atomicStore(Pos, &self.tail, tail, .Release);
return null;
}
var task: ?*Task = self.buffer[(tail -% 1) % self.buffer.len];
if (head != (tail -% 1)) {
return task;
}
if (@cmpxchgStrong(Pos, &self.head, head, head +% 1, .AcqRel, .Monotonic) != null)
task = null;
@atomicStore(Pos, &self.tail, tail, .Release);
return task;
}
fn popAndStealUnbounded(self: *BoundedQueue, target: *UnboundedQueue) ?*Task {
var consumer = target.tryAcquireConsumer() orelse return null;
defer consumer.release();
const tail = self.tail;
const head = @atomicLoad(Pos, &self.head, .Monotonic);
var new_tail = tail;
var first_task: ?*Task = null;
while (first_task == null or (new_tail -% head < self.buffer.len)) {
const task = consumer.pop() orelse break;
if (first_task == null) {
first_task = task;
} else {
@atomicStore(*Task, &self.buffer[new_tail % self.buffer.len], task, .Unordered);
new_tail +%= 1;
}
}
if (new_tail != tail)
@atomicStore(Pos, &self.tail, new_tail, .Release);
return first_task;
}
fn popOneBounded(self: *BoundedQueue, target: *BoundedQueue) ?*Task {
while (true) : (std.Thread.spinLoopHint()) {
const target_head = @atomicLoad(Pos, &target.head, .Acquire);
const target_tail = @atomicLoad(Pos, &target.tail, .Acquire);
if (target_tail == target_head)
return null;
if (target_tail == target_head -% 1)
return null;
if (target_tail -% target_head > self.buffer.len)
return null;
const task = @atomicLoad(*Task, &target.buffer[target_head % target.buffer.len], .Unordered);
_ = @cmpxchgWeak(
Pos,
&target.head,
target_head,
target_head +% 1,
.AcqRel,
.Monotonic,
) orelse {
return task;
};
}
}
fn popAndStealBounded(self: *BoundedQueue, target: *BoundedQueue) ?*Task {
if (self == target)
return self.pop();
const head = @atomicLoad(Pos, &self.head, .Monotonic);
const tail = self.tail;
if (tail != head)
return self.pop();
return self.popOneBounded(target);
// var new_tail = tail;
// const first_task = self.popOneBounded(target) orelse return null;
// while (new_tail -% head < self.buffer.len) {
// const task = self.popOneBounded(target) orelse break;
// @atomicStore(*Task, &self.buffer[new_tail % self.buffer.len], task, .Unordered);
// new_tail +%= 1;
// }
// if (new_tail != tail)
// @atomicStore(Pos, &self.tail, new_tail, .Release);
// return first_task;
}
};
const Bounded1 = struct {
head: Pos = 0,
tail: Pos = 0,
buffer: [capacity]*Task = undefined,
const Pos = std.meta.Int(.unsigned, std.meta.bitCount(usize) / 2);
const capacity = 128;
comptime {
std.debug.assert(capacity <= std.math.maxInt(Pos));
}
fn push(self: *BoundedQueue, _batch: Batch) ?Batch {
var batch = _batch;
if (batch.isEmpty()) {
return null;
}
var tail = self.tail;
var head = @atomicLoad(Pos, &self.head, .Monotonic);
while (true) {
if (batch.isEmpty())
return null;
if (tail -% head < self.buffer.len) {
while (tail -% head < self.buffer.len) {
const task = batch.pop() orelse break;
@atomicStore(*Task, &self.buffer[tail % self.buffer.len], task, .Unordered);
tail +%= 1;
}
@atomicStore(Pos, &self.tail, tail, .Release);
std.Thread.spinLoopHint();
head = @atomicLoad(Pos, &self.head, .Monotonic);
continue;
}
const new_head = head +% @intCast(Pos, self.buffer.len / 2);
if (@cmpxchgWeak(
Pos,
&self.head,
head,
new_head,
.Acquire,
.Monotonic,
)) |updated| {
std.Thread.spinLoopHint();
head = updated;
continue;
}
var overflowed = Batch{};
while (head != new_head) : (head +%= 1) {
const task = self.buffer[head % self.buffer.len];
overflowed.push(Batch.from(task));
}
overflowed.push(batch);
return overflowed;
}
}
fn pop(self: *BoundedQueue) ?*Task {
var tail = self.tail;
var head = @atomicLoad(Pos, &self.head, .Monotonic);
while (tail != head) {
head = @cmpxchgWeak(
Pos,
&self.head,
head,
head +% 1,
.Acquire,
.Monotonic,
) orelse return self.buffer[head % self.buffer.len];
std.Thread.spinLoopHint();
}
return null;
}
fn popAndStealUnbounded(self: *BoundedQueue, target: *UnboundedQueue) ?*Task {
var consumer = target.tryAcquireConsumer() orelse return null;
defer consumer.release();
const tail = self.tail;
const head = @atomicLoad(Pos, &self.head, .Monotonic);
var new_tail = tail;
var first_task: ?*Task = null;
while (first_task == null or (new_tail -% head < self.buffer.len)) {
const task = consumer.pop() orelse break;
if (first_task == null) {
first_task = task;
} else {
@atomicStore(*Task, &self.buffer[new_tail % self.buffer.len], task, .Unordered);
new_tail +%= 1;
}
}
if (new_tail != tail)
@atomicStore(Pos, &self.tail, new_tail, .Release);
return first_task;
}
fn popAndStealBounded(self: *BoundedQueue, target: *BoundedQueue) ?*Task {
if (self == target)
return self.pop();
const head = @atomicLoad(Pos, &self.head, .Monotonic);
const tail = self.tail;
if (tail != head)
return self.pop();
var target_head = @atomicLoad(Pos, &target.head, .Monotonic);
while (true) {
const target_tail = @atomicLoad(Pos, &target.tail, .Acquire);
const target_size = target_tail -% target_head;
if (target_size == 0)
return null;
var steal = target_size - (target_size / 2);
if (steal > target.buffer.len / 2) {
std.Thread.spinLoopHint();
target_head = @atomicLoad(Pos, &target.head, .Monotonic);
continue;
}
const first_task = @atomicLoad(*Task, &target.buffer[target_head % target.buffer.len], .Unordered);
var new_target_head = target_head +% 1;
var new_tail = tail;
steal -= 1;
while (steal > 0) : (steal -= 1) {
const task = @atomicLoad(*Task, &target.buffer[new_target_head % target.buffer.len], .Unordered);
new_target_head +%= 1;
@atomicStore(*Task, &self.buffer[new_tail % self.buffer.len], task, .Unordered);
new_tail +%= 1;
}
if (@cmpxchgWeak(
Pos,
&target.head,
target_head,
new_target_head,
.AcqRel,
.Monotonic,
)) |updated| {
std.Thread.spinLoopHint();
target_head = updated;
continue;
}
if (new_tail != tail)
@atomicStore(Pos, &self.tail, new_tail, .Release);
return first_task;
}
}
};
const Lock = if (std.builtin.os.tag == .windows)
WindowsLock
else if (std.Target.current.isDarwin())
DarwinLock
else if (std.builtin.link_libc)
PosixLock
else if (std.builtin.os.tag == .linux)
LinuxLock
else
@compileError("Unimplemented Lock primitive for platform");
const WindowsLock = struct {
srwlock: std.os.windows.SRWLOCK = std.os.windows.SRWLOCK_INIT,
const Self = @This();
pub fn deinit(self: *Self) void {
self.* = undefined;
}
pub fn tryAcquire(self: *Self) ?Held {
const status = std.os.windows.kernel32.TryAcquireSRWLockExclusive(&self.srwlock);
return if (status != std.os.windows.FALSE) Held{ .lock = self } else null;
}
pub fn acquire(self: *Self) Held {
std.os.windows.kernel32.AcquireSRWLockExclusive(&self.srwlock);
return Held{ .lock = self };
}
pub const Held = struct {
lock: *Self,
pub fn release(self: Held) void {
std.os.windows.kernel32.ReleaseSRWLockExclusive(&self.lock.srwlock);
}
};
};
const DarwinLock = extern struct {
os_unfair_lock: u32 = 0,
const Self = @This();
extern fn os_unfair_lock_lock(lock: *Self) void;
extern fn os_unfair_lock_unlock(lock: *Self) void;
pub fn deinit(self: *Self) void {
self.* = undefined;
}
pub fn acquire(self: *Self) Held {
os_unfair_lock_lock(self);
return Held{ .lock = self };
}
pub const Held = struct {
lock: *Self,
pub fn release(self: Held) void {
os_unfair_lock_unlock(self.lock);
}
};
};
const PosixLock = struct {
mutex: std.c.pthread_mutex_t = std.c.PTHREAD_MUTEX_INITIALIZER,
const Self = @This();
pub fn deinit(self: *Self) void {
const rc = std.c.pthread_mutex_destroy(&self.mutex);
std.debug.assert(rc == 0 or rc == std.os.EINVAL);
self.* = undefined;
}
pub fn acquire(self: *Self) Held {
std.debug.assert(std.c.pthread_mutex_lock(&self.mutex) == 0);
return Held{ .lock = self };
}
pub const Held = struct {
lock: *Self,
pub fn release(self: Held) void {
std.debug.assert(std.c.pthread_mutex_unlock(&self.lock.mutex) == 0);
}
};
};
const LinuxLock = struct {
state: State = .unlocked,
const Self = @This();
const State = enum(i32) {
unlocked = 0,
locked,
contended,
};
pub fn deinit(self: *Self) void {
self.* = undefined;
}
pub fn acquire(self: *Self) Held {
if (@cmpxchgWeak(
State,
&self.state,
.unlocked,
.locked,
.Acquire,
.Monotonic,
)) |_| {
self.acquireSlow();
}
return Held{ .lock = self };
}
fn acquireSlow(self: *Self) void {
@setCold(true);
var spin: usize = 0;
var lock_state = State.locked;
var state = @atomicLoad(State, &self.state, .Monotonic);
while (true) {
if (state == .unlocked) {
state = @cmpxchgWeak(
State,
&self.state,
state,
lock_state,
.Acquire,
.Monotonic,
) orelse return;
std.Thread.spinLoopHint();
continue;
}
if (state == .locked and spin < 100) {
spin += 1;
std.Thread.spinLoopHint();
state = @atomicLoad(State, &self.state, .Monotonic);
}
if (state != .contended) {
if (@cmpxchgWeak(
State,
&self.state,
state,
.contended,
.Monotonic,
.Monotonic,
)) |updated| {
std.Thread.spinLoopHint();
state = updated;
continue;
}
}
switch (std.os.linux.getErrno(std.os.linux.futex_wait(
@ptrCast(*const i32, &self.state),
std.os.linux.FUTEX_PRIVATE_FLAG | std.os.linux.FUTEX_WAIT,
@enumToInt(State.contended),
null,
))) {
0 => {},
std.os.EINTR => {},
std.os.EAGAIN => {},
std.os.ETIMEDOUT => unreachable,
else => unreachable,
}
spin = 0;
lock_state = .contended;
state = @atomicLoad(State, &self.state, .Monotonic);
}
}
pub const Held = struct {
lock: *Self,
pub fn release(self: Held) void {
switch (@atomicRmw(State, &self.lock.state, .Xchg, .unlocked, .Release)) {
.unlocked => unreachable,
.locked => {},
.contended => self.releaseSlow(),
}
}
fn releaseSlow(self: Held) void {
@setCold(true);
switch (std.os.linux.getErrno(std.os.linux.futex_wake(
@ptrCast(*const i32, &self.lock.state),
std.os.linux.FUTEX_PRIVATE_FLAG | std.os.linux.FUTEX_WAKE,
1,
))) {
0 => {},
std.os.EINVAL => {},
std.os.EACCES => {},
std.os.EFAULT => {},
else => unreachable,
}
}
};
};
const Event = if (std.builtin.os.tag == .windows)
WindowsEvent
else if (std.builtin.link_libc)
PosixEvent
else if (std.builtin.os.tag == .linux)
LinuxEvent
else
@compileError("Unimplemented Event primitive for platform");
const WindowsEvent = struct {
is_set: bool = false,
lock: std.os.windows.SRWLOCK = std.os.windows.SRWLOCK_INIT,
cond: std.os.windows.CONDITION_VARIABLE = std.os.windows.CONDITION_VARIABLE_INIT,
const Self = @This();
pub fn deinit(self: *Self) void {
self.* = undefined;
}
pub fn set(self: *Self) void {
std.os.windows.kernel32.AcquireSRWLockExclusive(&self.lock);
defer std.os.windows.kernel32.ReleaseSRWLockExclusive(&self.lock);
self.is_set = true;
std.os.windows.kernel32.WakeConditionVariable(&self.cond);
}
threadlocal var tls_frequency: u64 = 0;
pub fn wait(self: *Self, timeout: ?u64) bool {
var counter: u64 = undefined;
var frequency: u64 = undefined;
if (timeout) |timeout_ns| {
counter = std.os.windows.QueryPerformanceCounter();
frequency = tls_frequency;
if (frequency == 0) {
frequency = std.os.windows.QueryPerformanceFrequency();
tls_frequency = frequency;
}
}
std.os.windows.kernel32.AcquireSRWLockExclusive(&self.lock);
defer std.os.windows.kernel32.ReleaseSRWLockExclusive(&self.lock);
while (!self.is_set) {
var timeout_ms: std.os.windows.DWORD = std.os.windows.INFINITE;
if (timeout) |timeout_ns| {
const elapsed = blk: {
const now = std.os.windows.QueryPerformanceCounter();
const a = if (now >= counter) (now - counter) else 0;
const b = std.time.ns_per_s;
const c = frequency;
break :blk (((a / c) * b) + ((a % c) * b) / c);
};
if (elapsed > timeout_ns) {
return false;
} else {
const delay_ms = @divFloor(timeout_ns - elapsed, std.time.ns_per_ms);
timeout_ms = std.math.cast(std.os.windows.DWORD, delay_ms) catch timeout_ms;
}
}
const status = std.os.windows.kernel32.SleepConditionVariableSRW(
&self.cond,
&self.lock,
timeout_ms,
0,
);
if (status == std.os.windows.FALSE) {
switch (std.os.windows.kernel32.GetLastError()) {
.TIMEOUT => {},
else => |err| {
const e = std.os.windows.unexpectedError(err);
unreachable;
},
}
}
}
return true;
}
};
const PosixEvent = struct {
mutex: std.c.pthread_mutex_t = std.c.PTHREAD_MUTEX_INITIALIZER,
cond: std.c.pthread_cond_t = std.c.PTHREAD_COND_INITIALIZER,
is_set: bool = false,
const Self = @This();
pub fn deinit(self: *Self) void {
const m = std.c.pthread_mutex_destroy(&self.mutex);
std.debug.assert(m == 0 or m == std.os.EINVAL);
const c = std.c.pthread_cond_destroy(&self.cond);
std.debug.assert(c == 0 or c == std.os.EINVAL);
self.* = undefined;
}
pub fn set(self: *Self) void {
std.debug.assert(std.c.pthread_mutex_lock(&self.mutex) == 0);
defer std.debug.assert(std.c.pthread_mutex_unlock(&self.mutex) == 0);
self.is_set = true;
std.debug.assert(std.c.pthread_cond_signal(&self.cond) == 0);
}
pub fn wait(self: *Self, timeout: ?u64) bool {
std.debug.assert(std.c.pthread_mutex_lock(&self.mutex) == 0);
defer std.debug.assert(std.c.pthread_mutex_unlock(&self.mutex) == 0);
var deadline: ?u64 = null;
if (timeout) |timeout_ns| {
deadline = timestamp(std.os.CLOCK_MONOTONIC) + timeout_ns;
}
while (!self.is_set) {
const deadline_ns = deadline orelse {
std.debug.assert(std.c.pthread_cond_wait(&self.cond, &self.mutex) == 0);
continue;
};
var now_ns = timestamp(std.os.CLOCK_MONOTONIC);
if (now_ns >= deadline_ns) {
return false;
} else {
now_ns = timestamp(std.os.CLOCK_REALTIME);
now_ns += deadline_ns - now_ns;
}
var ts: std.os.timespec = undefined;
ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(now_ns, std.time.ns_per_s));
ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(now_ns, std.time.ns_per_s));
const rc = std.c.pthread_cond_timedwait(&self.cond, &self.mutex, &ts);
std.debug.assert(rc == 0 or rc == std.os.ETIMEDOUT);
}
return true;
}
fn timestamp(comptime clock: u32) u64 {
if (comptime std.Target.current.isDarwin()) {
switch (clock) {
std.os.CLOCK_REALTIME => {
var tv: std.os.timeval = undefined;
std.os.gettimeofday(&tv, null);
return (@intCast(u64, tv.tv_sec) * std.time.ns_per_s) + (@intCast(u64, tv.tv_usec) * std.time.ns_per_us);
},
std.os.CLOCK_MONOTONIC => {
var info: std.os.darwin.mach_timebase_info_data = undefined;
std.os.darwin.mach_timebase_info(&info);
var counter = std.os.darwin.mach_absolute_time();
if (info.numer > 1)
counter *= info.numer;
if (info.denom > 1)
counter /= info.denom;
return counter;
},
else => unreachable,
}
}
var ts: std.os.timespec = undefined;
std.os.clock_gettime(clock, &ts) catch return 0;
return (@intCast(u64, ts.tv_sec) * std.time.ns_per_s) + @intCast(u64, ts.tv_nsec);
}
};
const LinuxEvent = struct {
state: State = .unset,
const Self = @This();
const State = enum(i32) {
unset = 0,
set,
};
pub fn deinit(self: *Self) void {
self.* = undefined;
}
pub fn set(self: *Self) void {
@atomicStore(State, &self.state, .set, .Release);
switch (std.os.linux.getErrno(std.os.linux.futex_wake(
@ptrCast(*const i32, &self.state),
std.os.linux.FUTEX_PRIVATE_FLAG | std.os.linux.FUTEX_WAKE,
1,
))) {
0 => {},
std.os.EINVAL => {},
std.os.EACCES => {},
std.os.EFAULT => {},
else => unreachable,
}
}
pub fn wait(self: *Self, timeout: ?u64) bool {
var deadline: ?u64 = null;
while (true) {
if (@atomicLoad(State, &self.state, .Acquire) == .set)
return true;
var ts: std.os.timespec = undefined;
var ts_ptr: ?*std.os.timespec = null;
if (timeout) |timeout_ns| {
const delay_ns = delay: {
std.os.clock_gettime(std.os.CLOCK_MONOTONIC, &ts) catch return false;
const now_ns = (@intCast(u64, ts.tv_sec) * std.time.ns_per_s) + @intCast(u64, ts.tv_nsec);
if (deadline) |deadline_ns| {
if (now_ns >= deadline_ns)
return false;
break :delay (deadline_ns - now_ns);
} else {
deadline = now_ns + timeout_ns;
break :delay timeout_ns;
}
};
ts_ptr = &ts;
ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(delay_ns, std.time.ns_per_s));
ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(delay_ns, std.time.ns_per_s));
}
switch (std.os.linux.getErrno(std.os.linux.futex_wait(
@ptrCast(*const i32, &self.state),
std.os.linux.FUTEX_PRIVATE_FLAG | std.os.linux.FUTEX_WAIT,
@enumToInt(State.unset),
ts_ptr,
))) {
0 => {},
std.os.EINTR => {},
std.os.EAGAIN => {},
std.os.ETIMEDOUT => return false,
else => unreachable,
}
}
}
};
const Thread = if (std.builtin.os.tag == .windows)
WindowsThread
else if (std.builtin.link_libc)
PosixThread
else if (std.builtin.os.tag == .linux)
LinuxThread
else
@compileError("Unimplemented Thread primitive for platform");
const WindowsThread = struct {
handle: std.os.windows.HANDLE,
const Self = @This();
pub fn spawn(stack_size: usize, context: anytype, comptime entryFn: anytype) !Self {
const Context = @TypeOf(context);
const Wrapper = struct {
fn entry(raw_arg: std.os.windows.LPVOID) callconv(.C) std.os.windows.DWORD {
entryFn(@ptrCast(Context, @alignCast(@alignOf(Context), raw_arg)));
return 0;
}
};
const handle = std.os.windows.kernel32.CreateThread(
null,
stack_size,
Wrapper.entry,
@ptrCast(std.os.windows.LPVOID, context),
0,
null,
) orelse return error.SpawnError;
return Self{ .handle = handle };
}
pub fn join(self: Self) void {
std.os.windows.WaitForSingleObjectEx(self.handle, std.os.windows.INFINITE, false) catch unreachable;
std.os.windows.CloseHandle(self.handle);
}
};
const PosixThread = struct {
handle: std.c.pthread_t,
const Self = @This();
pub fn spawn(stack_size: usize, context: anytype, comptime entryFn: anytype) !Self {
const Context = @TypeOf(context);
const Wrapper = struct {
fn entry(raw_arg: ?*c_void) callconv(.C) ?*c_void {
entryFn(@ptrCast(Context, @alignCast(@alignOf(Context), raw_arg)));
return null;
}
};
var attr: std.c.pthread_attr_t = undefined;
if (std.c.pthread_attr_init(&attr) != 0)
return error.SystemResources;
defer std.debug.assert(std.c.pthread_attr_destroy(&attr) == 0);
if (std.c.pthread_attr_setstacksize(&attr, stack_size) != 0)
return error.SystemResources;
var handle: std.c.pthread_t = undefined;
const rc = std.c.pthread_create(
&handle,
&attr,
Wrapper.entry,
@ptrCast(?*c_void, context),
);
return switch (rc) {
0 => Self{ .handle = handle },
else => error.SpawnError,
};
}
pub fn join(self: Self) void {
const rc = std.c.pthread_join(self.handle, null);
std.debug.assert(rc == 0);
}
};
const LinuxThread = struct {
info: *Info,
const Self = @This();
const Info = struct {
mmap_ptr: usize,
mmap_len: usize,
context: usize,
handle: i32,
};
pub fn spawn(stack_size: usize, context: anytype, comptime entryFn: anytype) !Self {
var mmap_size: usize = std.mem.page_size;
const guard_end = mmap_size;
mmap_size = std.mem.alignForward(mmap_size + stack_size, std.mem.page_size);
const stack_end = mmap_size;
mmap_size = std.mem.alignForward(mmap_size, @alignOf(Info));
const info_begin = mmap_size;
mmap_size = std.mem.alignForward(mmap_size + @sizeOf(Info), std.os.linux.tls.tls_image.alloc_align);
const tls_begin = mmap_size;
mmap_size = std.mem.alignForward(mmap_size + std.os.linux.tls.tls_image.alloc_size, std.mem.page_size);
const mmap_bytes = try std.os.mmap(
null,
mmap_size,
std.os.PROT_NONE,
std.os.MAP_PRIVATE | std.os.MAP_ANONYMOUS,
-1,
0,
);
errdefer std.os.munmap(mmap_bytes);
try std.os.mprotect(
mmap_bytes[guard_end..],
std.os.PROT_READ | std.os.PROT_WRITE,
);
const info = @ptrCast(*Info, @alignCast(@alignOf(Info), &mmap_bytes[info_begin]));
info.* = .{
.mmap_ptr = @ptrToInt(mmap_bytes.ptr),
.mmap_len = mmap_bytes.len,
.context = @ptrToInt(context),
.handle = undefined,
};
var user_desc: switch (std.builtin.arch) {
.i386 => std.os.linux.user_desc,
else => void,
} = undefined;
var tls_ptr = std.os.linux.tls.prepareTLS(mmap_bytes[tls_begin..]);
if (std.builtin.arch == .i386) {
defer tls_ptr = @ptrToInt(&user_desc);
user_desc = .{
.entry_number = std.os.linux.tls.tls_image.gdt_entry_number,
.base_addr = tls_ptr,
.limit = 0xfffff,
.seg_32bit = 1,
.contents = 0,
.read_exec_only = 0,
.limit_in_pages = 1,
.seg_not_present = 0,
.useable = 1,
};
}
const flags: u32 =
std.os.CLONE_SIGHAND | std.os.CLONE_SYSVSEM |
std.os.CLONE_VM | std.os.CLONE_FS | std.os.CLONE_FILES |
std.os.CLONE_PARENT_SETTID | std.os.CLONE_CHILD_CLEARTID |
std.os.CLONE_THREAD | std.os.CLONE_DETACHED | std.os.CLONE_SETTLS;
const Context = @TypeOf(context);
const Wrapper = struct {
fn entry(raw_arg: usize) callconv(.C) u8 {
const info_ptr = @intToPtr(*Info, raw_arg);
entryFn(@intToPtr(Context, info_ptr.context));
return 0;
}
};
const rc = std.os.linux.clone(
Wrapper.entry,
@ptrToInt(&mmap_bytes[stack_end]),
flags,
@ptrToInt(info),
&info.handle,
tls_ptr,
&info.handle,
);
return switch (std.os.linux.getErrno(rc)) {
0 => Self{ .info = info },
else => error.SpawnError,
};
}
pub fn join(self: Self) void {
while (true) {
const tid = @atomicLoad(i32, &self.info.handle, .SeqCst);
if (tid == 0) {
std.os.munmap(@intToPtr([*]align(std.mem.page_size) u8, self.info.mmap_ptr)[0..self.info.mmap_len]);
return;
}
const rc = std.os.linux.futex_wait(&self.info.handle, std.os.linux.FUTEX_WAIT, tid, null);
switch (std.os.linux.getErrno(rc)) {
0 => continue,
std.os.EINTR => continue,
std.os.EAGAIN => continue,
else => unreachable,
}
}
}
};
pub fn main() !void {
const FibRecurse = struct {
fn call(allocator: *std.mem.Allocator, _n: usize) anyerror!void {
var n = _n;
if (n <= 1)
return;
suspend {
var task = Loop.Task.init(@frame());
Loop.schedule(&task, .High);
}
var tasks = std.ArrayList(*@Frame(@This().call)).init(allocator);
defer tasks.deinit();
var err: ?anyerror = null;
while (n > 1) {
const f = allocator.create(@Frame(@This().call)) catch |e| {
err = e;
break;
};
tasks.append(f) catch |e| {
err = e;
break;
};
f.* = async @This().call(allocator, n - 2);
n = n - 1;
}
for (tasks.items) |frame| {
(await frame) catch |e| {
if (err == null)
err = e;
};
}
return err orelse {};
}
};
const FibNaive = struct {
fn call(allocator: *std.mem.Allocator, n: usize) anyerror!usize {
if (n <= 1)
return n;
suspend {
var task = Loop.Task.init(@frame());
Loop.schedule(&task, .High);
}
const l = try allocator.create(@Frame(@This().call));
defer allocator.destroy(l);
const r = try allocator.create(@Frame(@This().call));
defer allocator.destroy(r);
l.* = async @This().call(allocator, n - 1);
r.* = async @This().call(allocator, n - 2);
const lv = await l;
const rv = await r;
const lc = try lv;
const rc = try rv;
return (lc + rc);
}
};
const Fib = FibRecurse;
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
var win_heap = if (std.builtin.os.tag == .windows) std.heap.HeapAllocator.init() else {};
const allocator = if (std.builtin.link_libc)
std.heap.c_allocator
else if (std.builtin.os.tag == .windows)
&win_heap.allocator
else
&gpa.allocator;
_ = try (try Loop.run(.{}, Fib.call, .{allocator, 30}));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment