Skip to content

Instantly share code, notes, and snippets.

@lithdew
Last active December 20, 2020 15:36
Show Gist options
  • Save lithdew/44cb615536477dbbdf120cace1c069a5 to your computer and use it in GitHub Desktop.
Save lithdew/44cb615536477dbbdf120cace1c069a5 to your computer and use it in GitHub Desktop.
pike: timeout wheel integration
const std = @import("std");
const mem = std.mem;
const meta = std.meta;
const math = std.math;
const debug = std.debug;
const testing = std.testing;
const assert = debug.assert;
pub const Wheel = GenericWheel(4, 6);
pub fn GenericWheel(comptime num_wheels: comptime_int, comptime num_bits_per_wheel: comptime_int) type {
comptime const num_wheel_slots = 1 << num_bits_per_wheel;
return struct {
pub const Unit = meta.Int(.unsigned, num_wheels * num_bits_per_wheel);
pub const Timer = struct {
parent: ?*List = null,
node: List.Node = .{},
expires: Unit = 0,
};
const SlotMask = math.IntFittingRange(0, (1 << num_wheel_slots) - 1);
const Number = math.IntFittingRange(0, num_wheels - 1);
const Slot = math.IntFittingRange(0, num_wheel_slots - 1);
const Self = @This();
wheels: [num_wheels][num_wheel_slots]List = init: {
break :init [_][num_wheel_slots]List{[_]List{.{}} ** num_wheel_slots} ** num_wheels;
},
pending: [num_wheels]SlotMask = init: {
break :init [_]SlotMask{0} ** num_wheels;
},
expired: List = .{},
current: Unit = 0,
pub inline fn scheduleUntil(self: *Self, timer: *Timer, expires: Unit) void {
self.schedule(timer, expires);
}
pub inline fn scheduleFor(self: *Self, timer: *Timer, duration: Unit) void {
self.schedule(timer, self.current + duration);
}
pub inline fn cancel(self: *Self, timer: *Timer) void {
self.remove(timer);
}
pub inline fn step(self: *Self, duration: Unit) void {
self.update(self.current + duration);
}
pub fn get(self: *Self) ?*Timer {
const node = self.expired.pop() orelse return null;
const timer = @fieldParentPtr(Timer, "node", node);
self.remove(timer);
return timer;
}
pub fn next(self: *const Self) ?Unit {
if (self.hasExpiredEntries()) {
return 0;
} else if (!self.hasPendingEntries()) {
return null;
}
var timeout = ~@as(Unit, 0);
comptime var wheel = 0;
comptime var iterator_mask = @as(Unit, 0);
inline while (wheel < num_wheels) : (wheel += 1) {
comptime const wheel_offset = @as(math.Log2Int(Unit), wheel) * num_bits_per_wheel;
const mask: SlotMask = self.pending[wheel];
if (mask != 0) {
const slot: SlotMask = @truncate(Slot, self.current >> wheel_offset);
const rotate = @as(u1, if (wheel != 0) 1 else 0);
const progress = @as(Unit, @ctz(SlotMask, math.rotr(SlotMask, mask, slot)) + rotate) << wheel_offset;
timeout = math.min(progress - (self.current & iterator_mask), timeout);
}
iterator_mask <<= num_bits_per_wheel;
iterator_mask |= (num_wheel_slots - 1);
}
return timeout;
}
pub fn update(self: *Self, current: Unit) void {
var elapsed: Unit = current - self.current;
var todo: List = .{};
comptime var wheel = 0;
inline while (wheel < num_wheels) : (wheel += 1) {
comptime const wheel_offset = @as(math.Log2Int(Unit), wheel) * num_bits_per_wheel;
// Construct a bitmask where all set bits represent the elapsed window of
// slot indices of the timer wheel.
const elapsed_slots: SlotMask = mask: {
const elapsed_wheel_offset = elapsed >> wheel_offset;
if (elapsed_wheel_offset >= num_wheel_slots) {
break :mask ~@as(SlotMask, 0);
} else {
const elapsed_pos = @truncate(Slot, elapsed_wheel_offset);
const old_pos = @truncate(Slot, self.current >> wheel_offset);
const new_pos = @truncate(Slot, current >> wheel_offset);
const elapsed_bit = (@as(SlotMask, 1) << @as(math.Log2Int(SlotMask), elapsed_pos)) - 1;
const old_bit_pos = math.rotl(SlotMask, elapsed_bit, @as(SlotMask, old_pos));
const diff_bit_pos = math.rotl(SlotMask, elapsed_bit, @as(SlotMask, new_pos));
const new_bit_pos = math.rotr(SlotMask, diff_bit_pos, @as(SlotMask, elapsed_pos));
const end_bit_pos = @as(SlotMask, 1) << new_pos;
break :mask old_bit_pos | new_bit_pos | end_bit_pos;
}
};
// Reschedule all elapsed timers.
while (elapsed_slots & self.pending[wheel] != 0) {
const slot = @truncate(Slot, @ctz(SlotMask, self.pending[wheel]));
todo.concat(&self.wheels[wheel][slot]);
self.unmarkPending(wheel, slot);
}
if (elapsed_slots & 0x1 == 0) {
break;
}
elapsed = math.max(elapsed, @as(Unit, num_wheel_slots) << wheel_offset);
}
self.current = current;
while (todo.pop()) |node| {
const timer = @fieldParentPtr(Timer, "node", node);
self.schedule(timer, timer.expires);
}
}
pub inline fn hasPendingEntries(self: *const Self) bool {
comptime var wheel = 0;
inline while (wheel < num_wheels) : (wheel += 1) {
if (self.pending[wheel] != 0) return true;
}
return false;
}
pub inline fn hasExpiredEntries(self: *const Self) bool {
return !self.expired.empty();
}
fn remove(self: *Self, timer: *Timer) void {
if (timer.parent) |parent| {
parent.remove(&timer.node);
// Remove from pending list if it's the last entry.
if (parent != &self.expired and parent.empty()) {
const index = (@ptrToInt(parent) - @ptrToInt(&self.wheels[0][0])) / @sizeOf(List);
const wheel = @intCast(Number, index / num_wheel_slots);
const slot = @intCast(Slot, index % num_wheel_slots);
self.unmarkPending(wheel, slot);
}
timer.parent = null;
timer.node = .{};
}
}
fn schedule(self: *Self, timer: *Timer, expires: Unit) void {
self.remove(timer);
timer.expires = expires;
// If the expiry time elapses the current time, append it to the
// expired timer list.
if (expires <= self.current) {
self.expired.append(&timer.node);
timer.parent = &self.expired;
return;
}
// Else, append it to its respective (wheel, slot) pending timer
// list.
const remaining: Unit = expires - self.current;
const wheel = getWheelNum(remaining);
const slot = getWheelSlot(wheel, expires);
self.wheels[wheel][slot].append(&timer.node);
timer.parent = &self.wheels[wheel][slot];
self.markPending(wheel, slot);
}
inline fn fls(x: anytype) @TypeOf(x) {
return meta.bitCount(@TypeOf(x)) - @clz(@TypeOf(x), x);
}
inline fn getWheelNum(time: Unit) Number {
assert(time != 0);
const wheel: Unit = fls(math.min(time, math.maxInt(Unit))) - 1;
return @truncate(Number, @intCast(Slot, wheel) / num_bits_per_wheel);
}
inline fn getWheelSlot(wheel: Number, time: Unit) Slot {
const offset = @as(math.Log2Int(Unit), wheel) * num_bits_per_wheel;
const rotate = @as(u1, if (wheel != 0) 1 else 0);
return @truncate(Slot, (time >> offset) - rotate);
}
inline fn markPending(self: *Self, wheel: Number, slot: Slot) void {
self.pending[wheel] |= @as(SlotMask, 1) << @as(math.Log2Int(SlotMask), slot);
}
inline fn unmarkPending(self: *Self, wheel: Number, slot: Slot) void {
self.pending[wheel] &= ~(@as(SlotMask, 1)) << @as(math.Log2Int(SlotMask), slot);
}
};
}
pub const List = struct {
pub const Node = struct {
next: ?*Node = null,
prev: ?*Node = null,
};
head: ?*Node = null,
tail: ?*Node = null,
pub inline fn empty(self: *const List) bool {
return self.head == null;
}
pub inline fn concat(self: *List, other: *List) void {
if (self.tail) |tail| {
tail.next = other.head;
if (tail.next) |next| {
next.prev = tail;
}
} else {
self.head = other.head;
}
self.tail = other.tail;
other.head = null;
other.tail = null;
}
pub inline fn prepend(self: *List, node: *Node) void {
if (self.head) |head| {
head.prev = node;
node.next = head;
} else {
self.tail = node;
}
self.head = node;
}
pub inline fn append(self: *List, node: *Node) void {
if (self.tail) |tail| {
tail.next = node;
node.prev = tail;
} else {
self.head = node;
}
self.tail = node;
}
pub inline fn shift(self: *List) ?*Node {
const node = self.head orelse return null;
self.head = node.next;
if (node.next) |next| {
next.prev = null;
} else {
self.tail = null;
}
}
pub inline fn pop(self: *List) ?*Node {
const node = self.tail orelse return null;
self.tail = node.prev;
if (node.prev) |prev| {
prev.next = null;
} else {
self.head = null;
}
return node;
}
pub inline fn remove(self: *List, node: *Node) void {
if (node.prev) |prev| {
prev.next = node.next;
} else {
self.head = node.next;
}
if (node.next) |next| {
next.prev = node.prev;
} else {
self.tail = node.prev;
}
}
};
test "Wheel.getWheelNum() / Wheel.getWheelSlot()" {
const current_time: Wheel.Unit = 0;
const expires_at: Wheel.Unit = 4095;
const remaining_time: Wheel.Unit = expires_at - current_time;
const wheel_num = Wheel.getWheelNum(remaining_time);
const wheel_slot = Wheel.getWheelSlot(wheel_num, expires_at);
testing.expectEqual(@as(Wheel.Number, 1), wheel_num);
testing.expectEqual(@as(Wheel.Slot, 62), wheel_slot);
}
test "Wheel: scheduleUntil() / remove() / update() / step()" {
var wheel: Wheel = .{};
var timer_a: Wheel.Timer = .{};
var timer_b: Wheel.Timer = .{};
var timer_c: Wheel.Timer = .{};
wheel.scheduleUntil(&timer_a, 1);
wheel.scheduleUntil(&timer_b, 3);
wheel.scheduleUntil(&timer_c, 5);
wheel.update(1);
testing.expectEqual(@as(?*Wheel.Timer, &timer_a), wheel.get());
testing.expectEqual(@as(?Wheel.Unit, 2), wheel.next());
wheel.cancel(&timer_b);
testing.expectEqual(@as(?Wheel.Unit, 4), wheel.next());
wheel.step(4);
testing.expect(!wheel.hasPendingEntries());
testing.expect(wheel.hasExpiredEntries());
}
test "Wheel: scheduleUntil() / step() / update() over lowest-order timer wheel" {
var i: Wheel.Unit = 1;
while (i < 64) : (i += 1) {
var wheel: Wheel = .{};
var timer: Wheel.Timer = .{};
wheel.scheduleUntil(&timer, i);
wheel.update(0);
testing.expectEqual(@as(?Wheel.Unit, i), wheel.next());
testing.expect(wheel.hasPendingEntries());
testing.expect(!wheel.hasExpiredEntries());
if (i % 2 == 0) {
wheel.step(wheel.next() orelse unreachable);
} else {
wheel.update(i);
}
testing.expectEqual(@as(?Wheel.Unit, 0), wheel.next());
testing.expect(!wheel.hasPendingEntries());
testing.expect(wheel.hasExpiredEntries());
testing.expectEqual(@as(?*Wheel.Timer, &timer), wheel.get());
}
}
test "Wheel: scheduleUntil() / step() / update() over higher-order timer wheel" {
inline for (.{ 64, 4096 }) |cap| {
var i: Wheel.Unit = 1;
while (i < 64) : (i += 1) {
var wheel: Wheel = .{};
var timers = [_]Wheel.Timer{.{}} ** cap;
for (timers) |*timer, pos| {
wheel.scheduleUntil(timer, (i * cap) + @truncate(Wheel.Unit, pos));
wheel.update(0);
testing.expectEqual(@as(?Wheel.Unit, i * cap), wheel.next());
testing.expect(wheel.hasPendingEntries());
testing.expect(!wheel.hasExpiredEntries());
}
wheel.step((i + 1) * cap);
testing.expectEqual(@as(?Wheel.Unit, 0), wheel.next());
testing.expect(!wheel.hasPendingEntries());
testing.expect(wheel.hasExpiredEntries());
for (timers) |_| {
testing.expect(wheel.get() != null);
}
testing.expect(!wheel.hasPendingEntries());
testing.expect(!wheel.hasExpiredEntries());
}
}
}
const std = @import("std");
const pike = @import("pike");
const TimeoutWheel = @import("timeout.zig").Wheel;
const TimeoutList = @import("timeout.zig").List;
const time = std.time;
pub const Wheel = struct {
pub const Timeout = struct {
inner: TimeoutWheel.Timer = .{},
task: pike.Task = undefined,
dead: bool = false,
};
lock: std.Mutex = .{},
inner: TimeoutWheel = .{},
timer: time.Timer,
event: pike.Event,
pub fn init() !Wheel {
var timer = try time.Timer.start();
var event = try pike.Event.init();
errdefer event.deinit();
return Wheel{ .timer = timer, .event = event };
}
pub fn deinit(self: *Wheel) void {
var todo: TimeoutList = .{};
{
const held = self.lock.acquire();
defer held.release();
for (self.inner.wheels) |*slots| {
for (slots) |*list| {
todo.concat(list);
}
}
todo.concat(&self.inner.expired);
}
while (todo.pop()) |node| {
const inner = @fieldParentPtr(TimeoutWheel.Timer, "node", node);
const timeout = @fieldParentPtr(Timeout, "inner", inner);
timeout.dead = true;
pike.dispatch(&timeout.task, .{});
}
self.event.deinit();
}
pub inline fn registerTo(self: *Wheel, notifier: *const pike.Notifier) !void {
try self.event.registerTo(notifier);
}
pub fn cancel(self: *Wheel, timeout: *Timeout) void {
var pending = false;
{
const held = self.lock.acquire();
defer held.release();
pending = timeout.inner.parent != null;
self.inner.cancel(timeout);
}
if (pending) {
timeout.dead = true;
pike.dispatch(&timeout.task, .{});
}
}
pub inline fn waitFor(self: *Wheel, duration_ms: usize) !void {
var timeout: Timeout = .{};
try self.waitForAsync(&timeout, duration_ms);
}
pub inline fn waitUntil(self: *Wheel, time_ms: usize) !void {
var timeout: Timeout = .{};
try self.waitUntilAsync(&timeout, time_ms);
}
pub fn waitForAsync(self: *Wheel, timeout: *Timeout, duration_ms: usize) !void {
timeout.task = pike.Task.init(@frame());
var frame: @Frame(pike.Event.post) = undefined;
suspend {
const held = self.lock.acquire();
defer held.release();
const units = @truncate(TimeoutWheel.Unit, duration_ms);
self.inner.scheduleFor(&timeout.inner, units);
frame = async self.event.post();
}
if (timeout.dead) {
return error.OperationCancelled;
}
try await frame;
}
pub fn waitUntilAsync(self: *Wheel, timeout: *Timeout, time_ms: usize) !void {
timeout.task = pike.Task.init(@frame());
var frame: @Frame(pike.Event.post) = undefined;
suspend {
const held = self.lock.acquire();
defer held.release();
const units = @truncate(TimeoutWheel.Unit, time_ms);
self.inner.scheduleUntil(&timeout.inner, units);
frame = async self.event.post();
}
if (timeout.dead) {
return error.OperationCancelled;
}
try await frame;
}
pub inline fn next(self: *Wheel) ?TimeoutWheel.Unit {
const held = self.lock.acquire();
defer held.release();
return self.inner.next();
}
pub inline fn update(self: *Wheel) void {
var todo: TimeoutList = .{};
{
const held = self.lock.acquire();
defer held.release();
const ticks = self.timer.lap() / time.ns_per_ms;
self.inner.step(@truncate(TimeoutWheel.Unit, ticks));
while (self.inner.get()) |timeout| todo.append(&timeout.node);
}
while (todo.pop()) |node| {
const inner = @fieldParentPtr(TimeoutWheel.Timer, "node", node);
const timeout = @fieldParentPtr(Timeout, "inner", inner);
pike.dispatch(&timeout.task, .{});
}
}
};
pub inline fn waitUntil(wheel: *Wheel, comptime duration_ms: comptime_int) !void {
try wheel.waitUntil(duration_ms);
std.debug.print("{} ms has elapsed!\n", .{duration_ms});
}
pub fn run(wheel: *Wheel) !void {
try waitUntil(wheel, 1000);
inline for ([_]u8{undefined} ** 10) |_, i| {
try waitUntil(wheel, 1000 + (i + 1) * 100);
}
try waitUntil(wheel, 3000);
try waitUntil(wheel, 4000);
try waitUntil(wheel, 5000);
}
pub fn main() !void {
const notifier = try pike.Notifier.init();
defer notifier.deinit();
var wheel = try Wheel.init();
defer wheel.deinit();
try wheel.registerTo(&notifier);
var frame = async run(&wheel);
while (true) {
wheel.update();
try notifier.poll(wheel.next() orelse break);
wheel.update();
}
try nosuspend await frame;
std.debug.print("Done!\n", .{});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment