Last active
November 24, 2022 17:52
-
-
Save rlapz/b6a7e6e91f08eff25f8e67b68627e630 to your computer and use it in GitHub Desktop.
BufferQueue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const Allocator = std.mem.Allocator; | |
const Thread = std.Thread; | |
const Mutex = Thread.Mutex; | |
const Condition = Thread.Condition; | |
pub fn BufferQueue(comptime T: type, comptime size: usize) type { | |
return struct { | |
allocator: Allocator, | |
is_alive: bool, | |
head: usize, | |
tail: usize, | |
count: usize, | |
buffer: []T, | |
condv: Condition, | |
mutex: Mutex, | |
const Self = @This(); | |
pub fn init(allocator: Allocator) !Self { | |
return .{ | |
.allocator = allocator, | |
.is_alive = true, | |
.head = 0, | |
.tail = 0, | |
.count = 0, | |
.buffer = try allocator.alloc(T, size), | |
.condv = .{}, | |
.mutex = .{}, | |
}; | |
} | |
pub fn deinit(self: *Self) void { | |
self.kill(); | |
self.allocator.free(self.buffer); | |
} | |
pub fn produce(self: *Self, data: T) !void { | |
self.mutex.lock(); | |
defer self.mutex.unlock(); | |
if (self.count == size) | |
return error.NoSpaceLeft; | |
var head = self.head; | |
self.buffer[head] = data; | |
head += 1; | |
self.head = if (head == size) 0 else head; | |
self.count += 1; | |
self.condv.signal(); | |
} | |
pub fn consume(self: *Self) !T { | |
const ret = try self.peek(); | |
try self.seen(); | |
return ret; | |
} | |
pub fn peek(self: *Self) !T { | |
self.mutex.lock(); | |
defer self.mutex.unlock(); | |
if (self.count == 0) | |
return error.NoData; | |
return self.buffer[self.tail]; | |
} | |
pub fn seen(self: *Self) void { | |
self.mutex.lock(); | |
defer self.mutex.unlock(); | |
if (self.count > 0) { | |
const tail = self.tail + 1; | |
self.tail = if (tail == size) 0 else tail; | |
self.count -= 1; | |
} | |
self.condv.signal(); | |
} | |
// wait until space available (count < size) | |
pub fn produceWait(self: *Self, data: T, timeout_ns: u64) void { | |
self.mutex.lock(); | |
defer self.mutex.unlock(); | |
while (self.count == size and self.is_alive) | |
self.condv.timedWait(&self.mutex, timeout_ns) catch {}; | |
if (!self.is_alive) | |
return; | |
var head = self.head; | |
self.buffer[head] = data; | |
head += 1; | |
self.head = if (head == size) 0 else head; | |
self.count += 1; | |
self.condv.signal(); | |
} | |
// wait until item available (count > 0) | |
pub fn consumeWait(self: *Self, timeout_ns: u64) T { | |
const ret = self.peekWait(timeout_ns); | |
self.seen(); | |
return ret; | |
} | |
pub fn peekWait(self: *Self, timeout_ns: u64) T { | |
self.mutex.lock(); | |
defer self.mutex.unlock(); | |
while (self.is_alive and self.count == 0) | |
self.condv.timedWait(&self.mutex, timeout_ns) catch {}; | |
return self.buffer[self.tail]; | |
} | |
pub fn isAlive(self: *Self) bool { | |
self.mutex.lock(); | |
defer self.mutex.unlock(); | |
return self.is_alive; | |
} | |
pub fn hasItem(self: *Self) bool { | |
self.mutex.lock(); | |
defer self.mutex.unlock(); | |
return self.count > 0; | |
} | |
pub fn kill(self: *Self) void { | |
self.mutex.lock(); | |
defer self.mutex.unlock(); | |
self.is_alive = false; | |
self.condv.broadcast(); | |
} | |
}; | |
} | |
// | |
// test | |
// | |
const time = std.time; | |
const Atomic = std.atomic.Atomic; | |
const dprint = std.debug.print; | |
const Buff = BufferQueue(u32, 10); | |
var num = Atomic(u32).init(0); | |
fn producer(buf: *Buff, id: u32) void { | |
while (buf.isAlive()) { | |
const __num = num.load(.Acquire); | |
if (__num == 9) | |
num.store(0, .Release) | |
else | |
num.store(__num + 1, .Release); | |
buf.produceWait(__num, time.ns_per_s); | |
dprint(" {} -> [{}]\n", .{ __num, id }); | |
time.sleep(time.ms_per_s); | |
} | |
dprint("producer: {}: Exiting...\n", .{id}); | |
} | |
fn consumer(buf: *Buff, id: u32) void { | |
var i: u32 = 0; | |
while (buf.hasItem()) { | |
const ret = buf.consumeWait(time.ns_per_s); | |
dprint("[{}] <- {}\n", .{ id, ret }); | |
time.sleep(time.ns_per_s); | |
i += 1; | |
if (i == 9) | |
buf.kill(); | |
} | |
dprint("consumer: {}: Exiting...\n", .{id}); | |
} | |
test "4 producers, 2 consumers" { | |
dprint("\n", .{}); | |
var buf = try Buff.init(std.testing.allocator); | |
defer buf.deinit(); | |
var p1 = try Thread.spawn(.{}, producer, .{ &buf, 0 }); | |
defer p1.join(); | |
var p2 = try Thread.spawn(.{}, producer, .{ &buf, 1 }); | |
defer p2.join(); | |
var p3 = try Thread.spawn(.{}, producer, .{ &buf, 2 }); | |
defer p3.join(); | |
var p4 = try Thread.spawn(.{}, producer, .{ &buf, 3 }); | |
defer p4.join(); | |
var c1 = try Thread.spawn(.{}, consumer, .{ &buf, 0 }); | |
defer c1.join(); | |
var c2 = try Thread.spawn(.{}, consumer, .{ &buf, 1 }); | |
defer c2.join(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment