Skip to content

Instantly share code, notes, and snippets.

@lithdew
Created December 22, 2020 23:33
Show Gist options
  • Save lithdew/c9eca524ba7dbb75459201adf0904e1f to your computer and use it in GitHub Desktop.
Save lithdew/c9eca524ba7dbb75459201adf0904e1f to your computer and use it in GitHub Desktop.
const std = @import("std");
const pike = @import("pike");
const snow = @import("snow");
const timeout = @import("timeout.zig");
const time = std.time;
const Self = @This();
pub const Handle = struct {
inner: timeout.Wheel.Timer = .{},
task: pike.Task = undefined,
dead: bool = false,
};
lock: std.Mutex = .{},
inner: timeout.Wheel = .{},
timer: time.Timer,
event: pike.Event,
dead: bool = false,
pub fn init() !Self {
var timer = try time.Timer.start();
var event = try pike.Event.init();
errdefer event.deinit();
return Self{ .timer = timer, .event = event };
}
pub fn deinit(self: *Self) void {
var todo: timeout.List = .{};
{
const held = self.lock.acquire();
defer held.release();
self.dead = true;
for (self.inner.wheels) |*slots| {
for (slots) |*list| {
todo.concat(list);
}
}
todo.concat(&self.inner.expired);
}
while (todo.pop()) |node| {
const inner = @fieldParentPtr(timeout.Wheel.Timer, "node", node);
const handle = @fieldParentPtr(Handle, "inner", inner);
handle.dead = true;
pike.dispatch(&handle.task, .{ .use_lifo = true });
}
self.event.deinit();
}
pub inline fn registerTo(self: *Self, notifier: *const pike.Notifier) !void {
try self.event.registerTo(notifier);
}
pub fn cancel(self: *Self, handle: *Handle) void {
var pending = false;
{
const held = self.lock.acquire();
defer held.release();
pending = handle.inner.parent != null;
self.inner.cancel(handle);
}
if (pending) {
handle.dead = true;
pike.dispatch(&handle.task, .{ .use_lifo = true });
}
}
pub inline fn waitFor(self: *Self, duration_ms: usize) !void {
var handle: Handle = .{};
try self.waitForAsync(&handle, duration_ms);
}
pub inline fn waitUntil(self: *Self, time_ms: usize) !void {
var handle: Handle = .{};
try self.waitUntilAsync(&handle, time_ms);
}
pub fn waitForAsync(self: *Self, handle: *Handle, duration_ms: usize) !void {
const held = self.lock.acquire();
if (self.dead) {
held.release();
return error.AlreadyShutdown;
}
handle.task = pike.Task.init(@frame());
var frame: @Frame(pike.Event.post) = undefined;
suspend {
const units = @truncate(timeout.Wheel.Unit, duration_ms);
self.inner.scheduleFor(&handle.inner, units);
frame = async self.event.post();
held.release();
}
try await frame;
if (handle.dead) {
return error.OperationCancelled;
}
}
pub fn waitUntilAsync(self: *Self, handle: *Handle, time_ms: usize) !void {
const held = self.lock.acquire();
if (self.dead) {
held.release();
return error.AlreadyShutdown;
}
handle.task = pike.Task.init(@frame());
var frame: @Frame(pike.Event.post) = undefined;
suspend {
const units = @truncate(timeout.Wheel.Unit, time_ms);
self.inner.scheduleUntil(&handle.inner, units);
frame = async self.event.post();
held.release();
}
try await frame;
if (handle.dead) {
return error.OperationCancelled;
}
}
pub inline fn update(self: *Self) ?timeout.Wheel.Unit {
var todo: timeout.List = .{};
{
const held = self.lock.acquire();
defer held.release();
const ticks = self.timer.lap() / time.ns_per_ms;
if (ticks != 0) {
self.inner.step(@truncate(timeout.Wheel.Unit, ticks));
}
while (self.inner.get()) |handle| todo.append(&handle.node);
}
while (todo.pop()) |node| {
const inner = @fieldParentPtr(timeout.Wheel.Timer, "node", node);
const handle = @fieldParentPtr(Handle, "inner", inner);
pike.dispatch(&handle.task, .{ .use_lifo = true });
}
return self.inner.next();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment