Skip to content

Instantly share code, notes, and snippets.

@mratsim
Created January 2, 2020 21:26
Show Gist options
  • Save mratsim/7838e005f818e2cfd9c78833e7eef05b to your computer and use it in GitHub Desktop.
Save mratsim/7838e005f818e2cfd9c78833e7eef05b to your computer and use it in GitHub Desktop.
import sequtils
# Producers' side
# ----------------------------------------------------
# Note, for this example, the producer and consumer are on the same thread
# In the actual implementation, "ProducersRangePromises"
# must be atomicRefCounted and the items in fulfilled must be atomic as well.
type
ProducersRangePromises = ref object
## This is a "distributed" binary indexed tree
## that holds promises over a for-loop range.
## The producers which fulfill a promise updates this tree.
# In the threaded-case, this is easily thread-safe as only monotonic increment are used
# The main issue is false-sharing / cache-ping pong if many thread are fulfilling
# promises stored in the same cache lin, but padding would be very memory inefficient.
start, stop, stride: int32
fullfilled: seq[int32]
proc newPromiseRange(start, stop, stride: int32): ProducersRangePromises =
assert stop > start
assert stride > 0
new result
result.start = start
result.stop = stop
result.stride = stride
let bufLen = (result.stop - result.start + result.stride-1) div result.stride
result.fullfilled.newSeq(bufLen)
proc getInternalIndex(pr: ProducersRangePromises, iterIndex: int32): int32 {.inline.} =
assert iterIndex in pr.start ..< pr.stop
result = (iterIndex - pr.start) div pr.stride
proc len(pr: ProducersRangePromises): int32 {.inline.} =
pr.fullfilled.len.int32
proc ready*(pr: ProducersRangePromises, index: int32) =
## requires the public iteration index in [start, stop) range.
assert index in pr.start ..< pr.stop
var idx = pr.getInternalIndex(index)
while true:
pr.fullfilled[idx] += 1
idx = idx shr 1
if idx == 0:
break
# Consumers' side
# ----------------------------------------------------
type
ConsumerRangeDelayedTasks = object
## A Range-delayed task is a task that is dependent on a loop range.
## The consumer will schedule those tasks piece by piece as they become available.
## The consumer keeps track of tasks already dispatched and the ready tasks published by the producer.
## This is thread-local. Many consumer can depend on the same producers' promises.
## The last one should release ProducersPromises memory.
promises: ProducersRangePromises
dispatched: seq[int32]
proc newConsumerRangeDelayedTasks(pr: ProducersRangePromises): ConsumerRangeDelayedTasks =
result.promises = pr
result.dispatched.newSeq(pr.fullfilled.len)
proc dispatch*(cr: var ConsumerRangeDelayedTasks, internalIndex: int32) =
## requires the internal index in [0, dispatched.len) range.
var idx = internalIndex
while true:
cr.dispatched[idx] += 1
idx = idx shr 1
if idx == 0:
break
proc anyAvailable(cr: ConsumerRangeDelayedTasks, index: int32): bool {.inline.} =
let pr = cr.promises
if index >= pr.len:
return false
return pr.fullfilled[index] > cr.dispatched[index]
proc anyFulfilled*(cr: var ConsumerRangeDelayedTasks): tuple[foundNew: bool, index: int32] =
## This search for a fulfilled promise that wasn't already
## dispatched.
## If it finds one, it will be marked dispatched and its internal index will be returned.
while true:
# 3 cases (non-exclusive)
# 1. there is an availability in the left subtree
# 2. there is an availability in the right subtree
# 3. the current node is available
let left = 2*result.index + 1
let right = left + 1
if cr.anyAvailable(left):
result.index = left
elif cr.anyAvailable(right):
result.index = right
elif cr.anyAvailable(result.index):
# The current node is available and none of the children are
break
else:
assert result.foundNew == false
return
# We found an available node, update the dispatch tree
result.foundNew = true
assert result.index in 0 ..< cr.dispatched.len
cr.dispatch(result.index)
when isMainModule:
# block:
# let pr = newPromiseRange(32, 128, 32)
# for i in 32'i32..< 128:
# echo "i: ", i, ", internalIndex: ", pr.getInternalIndex(i)
block:
let pr = newPromiseRange(0, 10, 1)
var cr = newConsumerRangeDelayedTasks(pr)
echo pr[]
echo "Adding 7"
pr.ready(7)
echo pr[]
block:
let promise = cr.anyFulfilled()
echo "searching for promise: ", promise
block:
let promise = cr.anyFulfilled()
echo "searching for promise: ", promise
echo "Adding [2, 0]"
pr.ready(2)
pr.ready(0)
block:
let promise = cr.anyFulfilled()
echo "searching for promise: ", promise
echo "Adding [3, 4]"
pr.ready(3)
pr.ready(4)
block:
let promise = cr.anyFulfilled()
echo "searching for promise: ", promise
block:
let promise = cr.anyFulfilled()
echo "searching for promise: ", promise
block:
let promise = cr.anyFulfilled()
echo "searching for promise: ", promise
block:
let promise = cr.anyFulfilled()
echo "searching for promise: ", promise
block:
let promise = cr.anyFulfilled()
echo "searching for promise: ", promise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment