Last active
December 20, 2020 15:36
-
-
Save lithdew/44cb615536477dbbdf120cace1c069a5 to your computer and use it in GitHub Desktop.
pike: timeout wheel integration
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const mem = std.mem; | |
const meta = std.meta; | |
const math = std.math; | |
const debug = std.debug; | |
const testing = std.testing; | |
const assert = debug.assert; | |
pub const Wheel = GenericWheel(4, 6); | |
pub fn GenericWheel(comptime num_wheels: comptime_int, comptime num_bits_per_wheel: comptime_int) type { | |
comptime const num_wheel_slots = 1 << num_bits_per_wheel; | |
return struct { | |
pub const Unit = meta.Int(.unsigned, num_wheels * num_bits_per_wheel); | |
pub const Timer = struct { | |
parent: ?*List = null, | |
node: List.Node = .{}, | |
expires: Unit = 0, | |
}; | |
const SlotMask = math.IntFittingRange(0, (1 << num_wheel_slots) - 1); | |
const Number = math.IntFittingRange(0, num_wheels - 1); | |
const Slot = math.IntFittingRange(0, num_wheel_slots - 1); | |
const Self = @This(); | |
wheels: [num_wheels][num_wheel_slots]List = init: { | |
break :init [_][num_wheel_slots]List{[_]List{.{}} ** num_wheel_slots} ** num_wheels; | |
}, | |
pending: [num_wheels]SlotMask = init: { | |
break :init [_]SlotMask{0} ** num_wheels; | |
}, | |
expired: List = .{}, | |
current: Unit = 0, | |
pub inline fn scheduleUntil(self: *Self, timer: *Timer, expires: Unit) void { | |
self.schedule(timer, expires); | |
} | |
pub inline fn scheduleFor(self: *Self, timer: *Timer, duration: Unit) void { | |
self.schedule(timer, self.current + duration); | |
} | |
pub inline fn cancel(self: *Self, timer: *Timer) void { | |
self.remove(timer); | |
} | |
pub inline fn step(self: *Self, duration: Unit) void { | |
self.update(self.current + duration); | |
} | |
pub fn get(self: *Self) ?*Timer { | |
const node = self.expired.pop() orelse return null; | |
const timer = @fieldParentPtr(Timer, "node", node); | |
self.remove(timer); | |
return timer; | |
} | |
pub fn next(self: *const Self) ?Unit { | |
if (self.hasExpiredEntries()) { | |
return 0; | |
} else if (!self.hasPendingEntries()) { | |
return null; | |
} | |
var timeout = ~@as(Unit, 0); | |
comptime var wheel = 0; | |
comptime var iterator_mask = @as(Unit, 0); | |
inline while (wheel < num_wheels) : (wheel += 1) { | |
comptime const wheel_offset = @as(math.Log2Int(Unit), wheel) * num_bits_per_wheel; | |
const mask: SlotMask = self.pending[wheel]; | |
if (mask != 0) { | |
const slot: SlotMask = @truncate(Slot, self.current >> wheel_offset); | |
const rotate = @as(u1, if (wheel != 0) 1 else 0); | |
const progress = @as(Unit, @ctz(SlotMask, math.rotr(SlotMask, mask, slot)) + rotate) << wheel_offset; | |
timeout = math.min(progress - (self.current & iterator_mask), timeout); | |
} | |
iterator_mask <<= num_bits_per_wheel; | |
iterator_mask |= (num_wheel_slots - 1); | |
} | |
return timeout; | |
} | |
pub fn update(self: *Self, current: Unit) void { | |
var elapsed: Unit = current - self.current; | |
var todo: List = .{}; | |
comptime var wheel = 0; | |
inline while (wheel < num_wheels) : (wheel += 1) { | |
comptime const wheel_offset = @as(math.Log2Int(Unit), wheel) * num_bits_per_wheel; | |
// Construct a bitmask where all set bits represent the elapsed window of | |
// slot indices of the timer wheel. | |
const elapsed_slots: SlotMask = mask: { | |
const elapsed_wheel_offset = elapsed >> wheel_offset; | |
if (elapsed_wheel_offset >= num_wheel_slots) { | |
break :mask ~@as(SlotMask, 0); | |
} else { | |
const elapsed_pos = @truncate(Slot, elapsed_wheel_offset); | |
const old_pos = @truncate(Slot, self.current >> wheel_offset); | |
const new_pos = @truncate(Slot, current >> wheel_offset); | |
const elapsed_bit = (@as(SlotMask, 1) << @as(math.Log2Int(SlotMask), elapsed_pos)) - 1; | |
const old_bit_pos = math.rotl(SlotMask, elapsed_bit, @as(SlotMask, old_pos)); | |
const diff_bit_pos = math.rotl(SlotMask, elapsed_bit, @as(SlotMask, new_pos)); | |
const new_bit_pos = math.rotr(SlotMask, diff_bit_pos, @as(SlotMask, elapsed_pos)); | |
const end_bit_pos = @as(SlotMask, 1) << new_pos; | |
break :mask old_bit_pos | new_bit_pos | end_bit_pos; | |
} | |
}; | |
// Reschedule all elapsed timers. | |
while (elapsed_slots & self.pending[wheel] != 0) { | |
const slot = @truncate(Slot, @ctz(SlotMask, self.pending[wheel])); | |
todo.concat(&self.wheels[wheel][slot]); | |
self.unmarkPending(wheel, slot); | |
} | |
if (elapsed_slots & 0x1 == 0) { | |
break; | |
} | |
elapsed = math.max(elapsed, @as(Unit, num_wheel_slots) << wheel_offset); | |
} | |
self.current = current; | |
while (todo.pop()) |node| { | |
const timer = @fieldParentPtr(Timer, "node", node); | |
self.schedule(timer, timer.expires); | |
} | |
} | |
pub inline fn hasPendingEntries(self: *const Self) bool { | |
comptime var wheel = 0; | |
inline while (wheel < num_wheels) : (wheel += 1) { | |
if (self.pending[wheel] != 0) return true; | |
} | |
return false; | |
} | |
pub inline fn hasExpiredEntries(self: *const Self) bool { | |
return !self.expired.empty(); | |
} | |
fn remove(self: *Self, timer: *Timer) void { | |
if (timer.parent) |parent| { | |
parent.remove(&timer.node); | |
// Remove from pending list if it's the last entry. | |
if (parent != &self.expired and parent.empty()) { | |
const index = (@ptrToInt(parent) - @ptrToInt(&self.wheels[0][0])) / @sizeOf(List); | |
const wheel = @intCast(Number, index / num_wheel_slots); | |
const slot = @intCast(Slot, index % num_wheel_slots); | |
self.unmarkPending(wheel, slot); | |
} | |
timer.parent = null; | |
timer.node = .{}; | |
} | |
} | |
fn schedule(self: *Self, timer: *Timer, expires: Unit) void { | |
self.remove(timer); | |
timer.expires = expires; | |
// If the expiry time elapses the current time, append it to the | |
// expired timer list. | |
if (expires <= self.current) { | |
self.expired.append(&timer.node); | |
timer.parent = &self.expired; | |
return; | |
} | |
// Else, append it to its respective (wheel, slot) pending timer | |
// list. | |
const remaining: Unit = expires - self.current; | |
const wheel = getWheelNum(remaining); | |
const slot = getWheelSlot(wheel, expires); | |
self.wheels[wheel][slot].append(&timer.node); | |
timer.parent = &self.wheels[wheel][slot]; | |
self.markPending(wheel, slot); | |
} | |
inline fn fls(x: anytype) @TypeOf(x) { | |
return meta.bitCount(@TypeOf(x)) - @clz(@TypeOf(x), x); | |
} | |
inline fn getWheelNum(time: Unit) Number { | |
assert(time != 0); | |
const wheel: Unit = fls(math.min(time, math.maxInt(Unit))) - 1; | |
return @truncate(Number, @intCast(Slot, wheel) / num_bits_per_wheel); | |
} | |
inline fn getWheelSlot(wheel: Number, time: Unit) Slot { | |
const offset = @as(math.Log2Int(Unit), wheel) * num_bits_per_wheel; | |
const rotate = @as(u1, if (wheel != 0) 1 else 0); | |
return @truncate(Slot, (time >> offset) - rotate); | |
} | |
inline fn markPending(self: *Self, wheel: Number, slot: Slot) void { | |
self.pending[wheel] |= @as(SlotMask, 1) << @as(math.Log2Int(SlotMask), slot); | |
} | |
inline fn unmarkPending(self: *Self, wheel: Number, slot: Slot) void { | |
self.pending[wheel] &= ~(@as(SlotMask, 1)) << @as(math.Log2Int(SlotMask), slot); | |
} | |
}; | |
} | |
pub const List = struct { | |
pub const Node = struct { | |
next: ?*Node = null, | |
prev: ?*Node = null, | |
}; | |
head: ?*Node = null, | |
tail: ?*Node = null, | |
pub inline fn empty(self: *const List) bool { | |
return self.head == null; | |
} | |
pub inline fn concat(self: *List, other: *List) void { | |
if (self.tail) |tail| { | |
tail.next = other.head; | |
if (tail.next) |next| { | |
next.prev = tail; | |
} | |
} else { | |
self.head = other.head; | |
} | |
self.tail = other.tail; | |
other.head = null; | |
other.tail = null; | |
} | |
pub inline fn prepend(self: *List, node: *Node) void { | |
if (self.head) |head| { | |
head.prev = node; | |
node.next = head; | |
} else { | |
self.tail = node; | |
} | |
self.head = node; | |
} | |
pub inline fn append(self: *List, node: *Node) void { | |
if (self.tail) |tail| { | |
tail.next = node; | |
node.prev = tail; | |
} else { | |
self.head = node; | |
} | |
self.tail = node; | |
} | |
pub inline fn shift(self: *List) ?*Node { | |
const node = self.head orelse return null; | |
self.head = node.next; | |
if (node.next) |next| { | |
next.prev = null; | |
} else { | |
self.tail = null; | |
} | |
} | |
pub inline fn pop(self: *List) ?*Node { | |
const node = self.tail orelse return null; | |
self.tail = node.prev; | |
if (node.prev) |prev| { | |
prev.next = null; | |
} else { | |
self.head = null; | |
} | |
return node; | |
} | |
pub inline fn remove(self: *List, node: *Node) void { | |
if (node.prev) |prev| { | |
prev.next = node.next; | |
} else { | |
self.head = node.next; | |
} | |
if (node.next) |next| { | |
next.prev = node.prev; | |
} else { | |
self.tail = node.prev; | |
} | |
} | |
}; | |
test "Wheel.getWheelNum() / Wheel.getWheelSlot()" { | |
const current_time: Wheel.Unit = 0; | |
const expires_at: Wheel.Unit = 4095; | |
const remaining_time: Wheel.Unit = expires_at - current_time; | |
const wheel_num = Wheel.getWheelNum(remaining_time); | |
const wheel_slot = Wheel.getWheelSlot(wheel_num, expires_at); | |
testing.expectEqual(@as(Wheel.Number, 1), wheel_num); | |
testing.expectEqual(@as(Wheel.Slot, 62), wheel_slot); | |
} | |
test "Wheel: scheduleUntil() / remove() / update() / step()" { | |
var wheel: Wheel = .{}; | |
var timer_a: Wheel.Timer = .{}; | |
var timer_b: Wheel.Timer = .{}; | |
var timer_c: Wheel.Timer = .{}; | |
wheel.scheduleUntil(&timer_a, 1); | |
wheel.scheduleUntil(&timer_b, 3); | |
wheel.scheduleUntil(&timer_c, 5); | |
wheel.update(1); | |
testing.expectEqual(@as(?*Wheel.Timer, &timer_a), wheel.get()); | |
testing.expectEqual(@as(?Wheel.Unit, 2), wheel.next()); | |
wheel.cancel(&timer_b); | |
testing.expectEqual(@as(?Wheel.Unit, 4), wheel.next()); | |
wheel.step(4); | |
testing.expect(!wheel.hasPendingEntries()); | |
testing.expect(wheel.hasExpiredEntries()); | |
} | |
test "Wheel: scheduleUntil() / step() / update() over lowest-order timer wheel" { | |
var i: Wheel.Unit = 1; | |
while (i < 64) : (i += 1) { | |
var wheel: Wheel = .{}; | |
var timer: Wheel.Timer = .{}; | |
wheel.scheduleUntil(&timer, i); | |
wheel.update(0); | |
testing.expectEqual(@as(?Wheel.Unit, i), wheel.next()); | |
testing.expect(wheel.hasPendingEntries()); | |
testing.expect(!wheel.hasExpiredEntries()); | |
if (i % 2 == 0) { | |
wheel.step(wheel.next() orelse unreachable); | |
} else { | |
wheel.update(i); | |
} | |
testing.expectEqual(@as(?Wheel.Unit, 0), wheel.next()); | |
testing.expect(!wheel.hasPendingEntries()); | |
testing.expect(wheel.hasExpiredEntries()); | |
testing.expectEqual(@as(?*Wheel.Timer, &timer), wheel.get()); | |
} | |
} | |
test "Wheel: scheduleUntil() / step() / update() over higher-order timer wheel" { | |
inline for (.{ 64, 4096 }) |cap| { | |
var i: Wheel.Unit = 1; | |
while (i < 64) : (i += 1) { | |
var wheel: Wheel = .{}; | |
var timers = [_]Wheel.Timer{.{}} ** cap; | |
for (timers) |*timer, pos| { | |
wheel.scheduleUntil(timer, (i * cap) + @truncate(Wheel.Unit, pos)); | |
wheel.update(0); | |
testing.expectEqual(@as(?Wheel.Unit, i * cap), wheel.next()); | |
testing.expect(wheel.hasPendingEntries()); | |
testing.expect(!wheel.hasExpiredEntries()); | |
} | |
wheel.step((i + 1) * cap); | |
testing.expectEqual(@as(?Wheel.Unit, 0), wheel.next()); | |
testing.expect(!wheel.hasPendingEntries()); | |
testing.expect(wheel.hasExpiredEntries()); | |
for (timers) |_| { | |
testing.expect(wheel.get() != null); | |
} | |
testing.expect(!wheel.hasPendingEntries()); | |
testing.expect(!wheel.hasExpiredEntries()); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const pike = @import("pike"); | |
const TimeoutWheel = @import("timeout.zig").Wheel; | |
const TimeoutList = @import("timeout.zig").List; | |
const time = std.time; | |
pub const Wheel = struct { | |
pub const Timeout = struct { | |
inner: TimeoutWheel.Timer = .{}, | |
task: pike.Task = undefined, | |
dead: bool = false, | |
}; | |
lock: std.Mutex = .{}, | |
inner: TimeoutWheel = .{}, | |
timer: time.Timer, | |
event: pike.Event, | |
pub fn init() !Wheel { | |
var timer = try time.Timer.start(); | |
var event = try pike.Event.init(); | |
errdefer event.deinit(); | |
return Wheel{ .timer = timer, .event = event }; | |
} | |
pub fn deinit(self: *Wheel) void { | |
var todo: TimeoutList = .{}; | |
{ | |
const held = self.lock.acquire(); | |
defer held.release(); | |
for (self.inner.wheels) |*slots| { | |
for (slots) |*list| { | |
todo.concat(list); | |
} | |
} | |
todo.concat(&self.inner.expired); | |
} | |
while (todo.pop()) |node| { | |
const inner = @fieldParentPtr(TimeoutWheel.Timer, "node", node); | |
const timeout = @fieldParentPtr(Timeout, "inner", inner); | |
timeout.dead = true; | |
pike.dispatch(&timeout.task, .{}); | |
} | |
self.event.deinit(); | |
} | |
pub inline fn registerTo(self: *Wheel, notifier: *const pike.Notifier) !void { | |
try self.event.registerTo(notifier); | |
} | |
pub fn cancel(self: *Wheel, timeout: *Timeout) void { | |
var pending = false; | |
{ | |
const held = self.lock.acquire(); | |
defer held.release(); | |
pending = timeout.inner.parent != null; | |
self.inner.cancel(timeout); | |
} | |
if (pending) { | |
timeout.dead = true; | |
pike.dispatch(&timeout.task, .{}); | |
} | |
} | |
pub inline fn waitFor(self: *Wheel, duration_ms: usize) !void { | |
var timeout: Timeout = .{}; | |
try self.waitForAsync(&timeout, duration_ms); | |
} | |
pub inline fn waitUntil(self: *Wheel, time_ms: usize) !void { | |
var timeout: Timeout = .{}; | |
try self.waitUntilAsync(&timeout, time_ms); | |
} | |
pub fn waitForAsync(self: *Wheel, timeout: *Timeout, duration_ms: usize) !void { | |
timeout.task = pike.Task.init(@frame()); | |
var frame: @Frame(pike.Event.post) = undefined; | |
suspend { | |
const held = self.lock.acquire(); | |
defer held.release(); | |
const units = @truncate(TimeoutWheel.Unit, duration_ms); | |
self.inner.scheduleFor(&timeout.inner, units); | |
frame = async self.event.post(); | |
} | |
if (timeout.dead) { | |
return error.OperationCancelled; | |
} | |
try await frame; | |
} | |
pub fn waitUntilAsync(self: *Wheel, timeout: *Timeout, time_ms: usize) !void { | |
timeout.task = pike.Task.init(@frame()); | |
var frame: @Frame(pike.Event.post) = undefined; | |
suspend { | |
const held = self.lock.acquire(); | |
defer held.release(); | |
const units = @truncate(TimeoutWheel.Unit, time_ms); | |
self.inner.scheduleUntil(&timeout.inner, units); | |
frame = async self.event.post(); | |
} | |
if (timeout.dead) { | |
return error.OperationCancelled; | |
} | |
try await frame; | |
} | |
pub inline fn next(self: *Wheel) ?TimeoutWheel.Unit { | |
const held = self.lock.acquire(); | |
defer held.release(); | |
return self.inner.next(); | |
} | |
pub inline fn update(self: *Wheel) void { | |
var todo: TimeoutList = .{}; | |
{ | |
const held = self.lock.acquire(); | |
defer held.release(); | |
const ticks = self.timer.lap() / time.ns_per_ms; | |
self.inner.step(@truncate(TimeoutWheel.Unit, ticks)); | |
while (self.inner.get()) |timeout| todo.append(&timeout.node); | |
} | |
while (todo.pop()) |node| { | |
const inner = @fieldParentPtr(TimeoutWheel.Timer, "node", node); | |
const timeout = @fieldParentPtr(Timeout, "inner", inner); | |
pike.dispatch(&timeout.task, .{}); | |
} | |
} | |
}; | |
pub inline fn waitUntil(wheel: *Wheel, comptime duration_ms: comptime_int) !void { | |
try wheel.waitUntil(duration_ms); | |
std.debug.print("{} ms has elapsed!\n", .{duration_ms}); | |
} | |
pub fn run(wheel: *Wheel) !void { | |
try waitUntil(wheel, 1000); | |
inline for ([_]u8{undefined} ** 10) |_, i| { | |
try waitUntil(wheel, 1000 + (i + 1) * 100); | |
} | |
try waitUntil(wheel, 3000); | |
try waitUntil(wheel, 4000); | |
try waitUntil(wheel, 5000); | |
} | |
pub fn main() !void { | |
const notifier = try pike.Notifier.init(); | |
defer notifier.deinit(); | |
var wheel = try Wheel.init(); | |
defer wheel.deinit(); | |
try wheel.registerTo(¬ifier); | |
var frame = async run(&wheel); | |
while (true) { | |
wheel.update(); | |
try notifier.poll(wheel.next() orelse break); | |
wheel.update(); | |
} | |
try nosuspend await frame; | |
std.debug.print("Done!\n", .{}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment