Created
June 28, 2021 13:22
-
-
Save mratsim/cf4392bbd1c98640a812db7d9ec14a95 to your computer and use it in GitHub Desktop.
Debugging chase_lev_deques
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Nim-Taskpools | |
# Copyright (c) 2021 Status Research & Development GmbH | |
# Licensed and distributed under either of | |
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). | |
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). | |
# at your option. This file may not be copied, modified, or distributed except according to those terms. | |
# chase_lev_deques.nim | |
# -------------------- | |
# This file implements a Chase-Lev deque | |
# This is a single-consumer multi-consumer concurrent queue | |
# for work-stealing schedulers. | |
# | |
# Papers: | |
# - Dynamic Circular Work-Stealing Deque | |
# David Chase, Yossi Lev, 1993 | |
# https://www.dre.vanderbilt.edu/~schmidt/PDF/work-stealing-dequeue.pdf | |
# | |
# - Correct and Efficient Work-Stealing for Weak Memory Models | |
# Nhat Minh Lê, Antoniu Pop, Albert Cohen, Francesco Zappa Nardelli, 2013 | |
# https://fzn.fr/readings/ppopp13.pdf | |
# | |
# We straight translate the second paper which includes formal proofs of correctness, | |
# and uses modern C++11 code. | |
# | |
# A Chase-lev dequeue implements the following push, pop, steal. | |
# | |
# top bottom | |
# --------------------------------- | |
# | | | | <- push() | |
# steal() <- | Task 0 | Task 1 | Task 2 | -> pop() | |
# any thread | | | | owner-only | |
# --------------------------------- | |
# | |
# To reduce contention, stealing is done on the opposite end from push/pop | |
# so that there is a race only for the very last task. | |
{.push raises: [].} | |
import | |
system/ansi_c, | |
std/[locks, typetraits, atomics], | |
./instrumentation/[contracts, loggers] | |
type | |
Buf[T] = object | |
## Backend buffer of a ChaseLevDeque | |
## `capacity` MUST be a power of 2 | |
capacity: int | |
mask: int # == capacity-1 implies (i and mask) == (i mod capacity) | |
rawBuffer: UncheckedArray[Atomic[T]] | |
ChaseLevDeque*[T] = object | |
## This implements a lock-free, growable, work-stealing deque. | |
## The owning thread enqueues and dequeues at the bottom | |
## Foreign threads steal at the top. | |
## | |
## Default queue size is 8 | |
## Queue can grow to handle up to 34 359 738 368 tasks in flights | |
## TODO: | |
## with --gc:arc / --gc:orc, use a seq instead of a fixed max size. | |
top {.align: 64.}: Atomic[int] | |
bottom: Atomic[int] | |
buf: Atomic[ptr Buf[T]] | |
garbage: array[32, ptr Buf[T]] # up to 34 359 738 368 sized buffer | |
garbageUsed: uint8 | |
func isPowerOfTwo(n: int): bool {.inline.} = | |
(n and (n - 1)) == 0 and (n != 0) | |
proc newBuf(T: typedesc, capacity: int): ptr Buf[T] = | |
# Tasks have a destructor | |
# static: | |
# doAssert supportsCopyMem(T), $T & " must be a (POD) plain-old-data type: no seq, string, ref." | |
preCondition: capacity.isPowerOfTwo() | |
result = cast[ptr Buf[T]]( | |
c_malloc(csize_t 2*sizeof(int) + sizeof(T)*capacity) | |
) | |
result.capacity = capacity | |
result.mask = capacity - 1 | |
result.rawBuffer.addr.zeroMem(sizeof(T)*capacity) | |
proc `[]=`[T](buf: var Buf[T], index: int, item: T) {.inline.} = | |
log("pushed: task 0x%.08x at logical index %d (physical %d)\n", item, index, index and buf.mask) | |
buf.rawBuffer[index and buf.mask].store(item, moRelaxed) | |
proc `[]`[T](buf: var Buf[T], index: int): T {.inline.} = | |
result = buf.rawBuffer[index and buf.mask].load(moRelaxed) | |
log("popped: task 0x%.08x at logical index %d (physical %d)\n", result, index, index and buf.mask) | |
proc grow[T](deque: var ChaseLevDeque[T], buf: var ptr Buf[T], top, bottom: int) {.inline.} = | |
## Double the buffer size | |
## bottom is the last item index | |
## | |
## To handle race-conditions the current "top", "bottom" and "buf" | |
## have to be saved before calling this procedure. | |
## It reads and writes the "deque.buf", "deque.garbage" and "deque.garbageUsed" | |
# Read -> Copy -> Update | |
var tmp = newBuf(T, buf.capacity*2) | |
for i in top ..< bottom: | |
tmp[][i] = buf[][i] | |
# This requires 68+ billions tasks in flight (per-thread) | |
ascertain: deque.garbageUsed.int < deque.garbage.len | |
deque.garbage[deque.garbageUsed] = buf | |
swap(buf, tmp) | |
deque.buf.store(buf, moRelaxed) | |
# Public API | |
# --------------------------------------------------- | |
proc init*[T](deque: var ChaseLevDeque[T]) = | |
## Initializes a new Chase-lev work-stealing deque. | |
deque.reset() | |
deque.buf.store(newBuf(T, 8), moRelaxed) | |
proc teardown*[T](deque: var ChaseLevDeque[T]) = | |
## Teardown a Chase-lev work-stealing deque. | |
for i in 0 ..< deque.garbageUsed.int: | |
c_free(deque.garbage[i]) | |
c_free(deque.buf.load(moRelaxed)) | |
proc push*[T](deque: var ChaseLevDeque[T], item: T) = | |
## Enqueue an item at the bottom | |
## The item should not be used afterwards. | |
let # Handle race conditions | |
b = deque.bottom.load(moRelaxed) | |
t = deque.top.load(moAcquire) | |
var a = deque.buf.load(moRelaxed) | |
log("push (before): using %d out of %d (bottom: %d, top: %d)\n", b-t, a.capacity, b, t) | |
if b-t > a.capacity - 1: | |
# Full queue | |
log("push: growing queue %d -> %d (buf: 0x%.08x)\n", a.capacity, a.capacity * 2, a) | |
deque.grow(a, t, b) | |
log("push: new queue buf: 0x%.08x\n", a) | |
a[][b] = item | |
log("pushed: task 0x%.08x\n", a[][b]) | |
fence(moRelease) | |
deque.bottom.store(b+1, moRelaxed) | |
proc pop*[T](deque: var ChaseLevDeque[T]): T = | |
## Deque an item at the bottom | |
let # Handle race conditions | |
b = deque.bottom.load(moRelaxed) - 1 | |
a = deque.buf.load(moRelaxed) | |
deque.bottom.store(b, moRelaxed) | |
fence(moSequentiallyConsistent) | |
var t = deque.top.load(moRelaxed) | |
log("pop (before): pending %d out of %d (bottom-1: %d, top: %d)\n", b-t, a.capacity, b, t) | |
if t <= b: | |
# Non-empty queue. | |
result = a[][b] | |
log("popped: task 0x%.08x\n", result) | |
if t == b: | |
# Single last element in queue. | |
if not compare_exchange(deque.top, t, t+1, moSequentiallyConsistent, moRelaxed): | |
# Failed race. | |
result = default(T) | |
deque.bottom.store(b+1, moRelaxed) | |
else: | |
# Empty queue. | |
result = default(T) | |
deque.bottom.store(b+1, moRelaxed) | |
proc steal*[T](deque: var ChaseLevDeque[T]): T = | |
## Deque an item at the top | |
var t = deque.top.load(moAcquire) | |
fence(moSequentiallyConsistent) | |
let b = deque.bottom.load(moAcquire) | |
result = default(T) | |
log("steal: pending %d (bottom: %d, top: %d)\n", b-t, b, t) | |
if t < b: | |
# Non-empty queue. | |
let a = deque.buf.load(moConsume) | |
result = a[][t] | |
if not compare_exchange(deque.top, t, t+1, moSequentiallyConsistent, moRelaxed): | |
# Failed race. | |
return default(T) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment