Last active
December 2, 2024 02:37
-
-
Save vbe0201/2f30163415e6e99dafe3045d8d254b4f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const assert = std.debug.assert; | |
const cache_line = std.atomic.cache_line; | |
const AtomicUsize = std.atomic.Atomic(usize); | |
const Ordering = std.atomic.Ordering; | |
const Allocator = std.mem.Allocator; | |
// A modified version of Dmitry Vyukov's bounded MPMC queue, | |
// adapted for single-consumer usage: | |
// https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue | |
// | |
// Most notably, allowing only a single thread to receive at | |
// the same time greatly simplifies the dequeue logic and | |
// removes the need for some atomic operations. | |
/// A bounded MPSC queue storing `T` values. | |
pub fn Queue(comptime T: type) type { | |
return struct { | |
// Index into the buffer at which the next element | |
// to enqueue should be placed. | |
// | |
// Must be masked with `buffer_mask`. | |
enqueue_pos: AtomicUsize align(cache_line), | |
// Index into the buffer from which the next element | |
// should be obtained, if present. | |
// | |
// Must be masked with `buffer_mask`. | |
dequeue_pos: usize align(cache_line), | |
// A buffer of queue slots at which values can be | |
// placed. A sequential stamp indicates if a slot | |
// currently holds a value. | |
slots: [*]Slot, | |
// Bit mask for masking enqueue/dequeue positions | |
// and advancing slot stamps after every usage. | |
slots_mask: usize, | |
// The allocator used to allocate the slot list. | |
// Kept for later resource cleanup. | |
alloc: Allocator, | |
/// A Queue slot. | |
const Slot = struct { | |
sequence: AtomicUsize, | |
value: T = undefined, | |
}; | |
const Self = @This(); | |
/// Creates a new instance of the Queue with a given | |
/// capacity. | |
/// | |
/// The `alloc` argument is used once to allocate | |
/// `capacity` value slots. No subsequent allocations | |
/// will ever happen. | |
/// | |
/// `.deinit()` must be called later to free the | |
/// occupied resources. | |
/// | |
/// `capacity` must be at least 2 and a power of two. | |
pub fn init(alloc: Allocator, capacity: usize) !Self { | |
assert(capacity >= 2 and std.math.isPowerOfTwo(capacity)); | |
var slots = try alloc.alloc(Slot, capacity); | |
for (slots) |*slot, index| { | |
slot.* = .{ .sequence = AtomicUsize.init(index) }; | |
} | |
return .{ | |
.enqueue_pos = AtomicUsize.init(0), | |
.dequeue_pos = 0, | |
.slots = slots.ptr, | |
.slots_mask = capacity - 1, | |
.alloc = alloc, | |
}; | |
} | |
/// Frees the resources occupied by this Queue. | |
pub fn deinit(self: Self) void { | |
self.alloc.free(self.slots[0..self.len()]); | |
} | |
/// Gets the capacity of the Queue, i.e. how many | |
/// elements it can store at once. | |
pub fn len(self: *const Self) usize { | |
// We subtracted 1 from capacity during type | |
// construction. Thus, this never overflows. | |
return self.slots_mask + 1; | |
} | |
/// Attempts to push a value into the Queue, returning | |
/// whether the operation was successful. | |
pub fn tryPush(self: *Self, value: T) bool { | |
var pos = self.enqueue_pos.load(.Monotonic); | |
while (true) { | |
const slot = &self.slots[pos & self.slots_mask]; | |
const cmp = @bitCast(isize, slot.sequence.load(.Acquire) -% pos); | |
if (cmp == 0) { | |
// Stamp value matches enqueue position; | |
// we can attempt to claim this slot. | |
if (self.enqueue_pos.tryCompareAndSwap(pos, pos +% 1, .Monotonic, .Monotonic)) |old| { | |
pos = old; | |
} else { | |
// We claimed the slot, store our value. | |
slot.value = value; | |
slot.sequence.store(pos +% 1, .Release); | |
return true; | |
} | |
} else if (cmp < 0) { | |
// Stamp value is smaller than enqueue position; | |
// slot is currently filled so we can't touch it. | |
return false; | |
} else { | |
// Stamp value is greater than enqueue position; | |
// we raced with another producer that has already | |
// claimed the slot so we need to retry. | |
pos = self.enqueue_pos.load(.Monotonic); | |
} | |
} | |
} | |
/// Attempts to pop the next value off the queue. | |
/// | |
/// Returns `null` when the Queue does not store | |
/// any elements currently. | |
/// | |
/// Only one concurrent caller allowed at any time! | |
pub fn tryPop(self: *Self) ?T { | |
var pos = self.dequeue_pos; | |
const slot = &self.slots[pos & self.slots_mask]; | |
pos +%= 1; | |
// If stamp is ahead of dequeue position by one, | |
// we can take the value from the slot. | |
const stamp = slot.sequence.load(.Acquire); | |
if (stamp == pos) { | |
// Advance dequeue position to next slot. | |
self.dequeue_pos = pos; | |
// Advance the sequence count to mark the | |
// slot as unoccupied. | |
const value = slot.value; | |
slot.sequence.store(pos +% self.slots_mask, .Release); | |
return value; | |
} else { | |
return null; | |
} | |
} | |
}; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
nice zig code vale