Skip to content

Instantly share code, notes, and snippets.

@kprotty
Created July 29, 2020 13:49
Show Gist options
  • Save kprotty/a46ab161136c9ad7d903203fb11a9ce3 to your computer and use it in GitHub Desktop.
Save kprotty/a46ab161136c9ad7d903203fb11a9ce3 to your computer and use it in GitHub Desktop.
Zig nanocoroutines
// Bit-vector + rand shuffle order scheduler
const std = @import("std");
const Allocator = std.mem.Allocator;
const workload_size = 1 << 20;
const n_workloads = 4;
const repetition = 4;
const use_async = true;
const use_manual_prefetch = true;
const Cell = extern struct {
next_index: u64,
padding: [7]u64 = [_]u64{0} ** 7,
};
fn initCells(allocator: *Allocator, size: usize) ![]Cell {
const order = try allocator.alloc(u64, size - 1);
defer allocator.free(order);
for (order) |*e, i| e.* = i + 1;
var rnd = std.rand.DefaultPrng.init(@bitCast(usize, std.time.milliTimestamp()));
rnd.random.shuffle(u64, order);
const cells = try allocator.alloc(Cell, size);
var prev: u64 = 0;
for (order) |i| {
cells[prev] = .{.next_index = i};
prev = i;
}
cells[prev] = .{.next_index = size};
return cells;
}
fn syncSum(workloads: *const [n_workloads][]const Cell) u64 {
var sum: u64 = 0;
for (workloads) |workload| {
var i: u64 = 0;
while (i != workload.len) {
i = workload[i].next_index;
sum += i;
}
}
return sum;
}
fn asyncWorker(workload: []const Cell, ready: *bool) u64 {
var partial_sum: u64 = 0;
var i: u64 = 0;
while (i != workload.len) {
if (use_manual_prefetch) {
asm volatile("prefetcht0 (%[addr])" :: [addr] "r" (&workload[i].next_index));
suspend;
i = workload[i].next_index;
partial_sum += i;
} else {
i = workload[i].next_index;
suspend;
partial_sum += i;
}
}
ready.* = true;
return partial_sum;
}
fn asyncSum(workloads: *const [n_workloads][]const Cell) u64 {
var frames: [n_workloads]@Frame(asyncWorker) = undefined;
var ready = [_]bool{false} ** n_workloads;
for (frames) |*f, i| f.* = async asyncWorker(workloads[i], &ready[i]);
while (true) {
var any_unready = false;
for (frames) |*f, i| {
if (!ready[i]) {
resume f;
any_unready = true;
}
}
if (!any_unready) {
break;
}
}
var sum: u64 = 0;
for (frames) |*f| {
sum += nosuspend await f;
}
return sum;
}
pub fn main() !void {
const allocator = std.heap.page_allocator;
var workloads = [_][]Cell{&[_]Cell{}} ** n_workloads;
defer for (workloads) |w| allocator.free(w);
for (workloads) |*w| w.* = try initCells(allocator, workload_size);
var i: usize = 0;
while (i < repetition) : (i += 1) {
var timer = try std.time.Timer.start();
const sum = if (use_async) asyncSum(&workloads) else syncSum(&workloads);
var dt = timer.read();
std.debug.print("sum: {}, {} ms\n", .{sum, dt / 1_000_000});
}
}
// Linked-list + lemire rng shuffle scheduler
const std = @import("std");
const workload_size = 1 << 20;
const n_workloads = 4;
const repetitions = 4;
const use_async = true;
const use_manual_prefetch = true;
pub fn main() !void {
const allocator = std.heap.page_allocator;
var allocated: usize = 0;
var workloads = [_][]Cell{&[_]Cell{}} ** n_workloads;
defer for (workloads[0..allocated]) |workload|
allocator.free(workload);
for (workloads) |*workload| {
workload.* = try Cell.init(allocator);
allocated += 1;
}
var i: usize = 0;
var timer = try std.time.Timer.start();
while (i < repetitions) : (i += 1) {
const start = timer.read();
const sum = getSum(&workloads);
const elapsed = timer.read() - start;
std.debug.print("sum: {}, {} ms\n", .{elapsed, elapsed / std.time.ns_per_ms});
}
}
const Cell = extern struct {
next_index: u64 align(64),
fn init(allocator: *std.mem.Allocator) ![]Cell {
const coprime = blk: {
var i: u64 = workload_size / 2;
while (i < workload_size) : (i += 1) {
if (gcd(i, workload_size) == 1)
break :blk i;
}
unreachable;
};
const cells = try allocator.alloc(Cell, workload_size);
const offset = @bitCast(u64, std.time.milliTimestamp());
var i: u64 = 1;
var prev = ((0 * coprime) + offset) % workload_size;
while (i < (workload_size + 1)) : (i += 1) {
const index = ((i * coprime) + offset) % workload_size;
cells[prev] = .{ .next_index = index };
prev = @intCast(usize, index);
}
return cells;
}
fn gcd(a: u64, b: u64) u64 {
var u = a;
var v = b;
if (u == 0) return v;
if (v == 0) return u;
const Shift = std.math.Log2Int(u64);
const shift = @intCast(Shift, @ctz(u64, u | v));
u >>= @intCast(Shift, @ctz(u64, u));
while (true) {
v >>= @intCast(Shift, @ctz(u64, v));
if (u > v) {
const t = v;
v = u;
u = t;
}
v = v - u;
if (v == 0)
break;
}
return u << shift;
}
};
fn getSum(workloads: *const [n_workloads][]const Cell) u64 {
var sum: u64 = 0;
if (use_async) {
var frames: [n_workloads]@Frame(worker) = undefined;
for (workloads) |workload, i|
frames[i] = async worker(&sum, workload);
while (Task.poll()) |task|
resume task.frame;
} else {
for (workloads) |workload|
worker(&sum, workload);
}
return sum;
}
fn worker(sum: *u64, workload: []const Cell) void {
var i: u64 = 0;
var j: usize = workload.len;
while (j != 0) : (j -= 1) {
const ptr = &workload[i].next_index;
i = if (use_async) asyncLoad(ptr) else ptr.*;
sum.* += i;
}
if (use_async) {
suspend;
}
}
fn asyncLoad(ptr: anytype) @TypeOf(ptr.*) {
var task = Task{ .frame = @frame() };
suspend {
task.ready();
if (use_manual_prefetch)
asm volatile("prefetcht0 (%[ptr])" :: [ptr] "r" (ptr));
}
return ptr.*;
}
const Task = struct {
next: ?*Task = undefined,
frame: anyframe,
var head: ?*Task = null;
var tail: *Task = undefined;
fn ready(task: *Task) void {
if (head != null)
tail.next = task;
tail = task;
task.next = null;
head = head orelse task;
}
fn poll() ?*Task {
const task = head orelse return null;
head = task.next;
return task;
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment