Created
January 2, 2020 21:26
-
-
Save mratsim/7838e005f818e2cfd9c78833e7eef05b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sequtils | |
# Producers' side | |
# ---------------------------------------------------- | |
# Note, for this example, the producer and consumer are on the same thread | |
# In the actual implementation, "ProducersRangePromises" | |
# must be atomicRefCounted and the items in fulfilled must be atomic as well. | |
type | |
ProducersRangePromises = ref object | |
## This is a "distributed" binary indexed tree | |
## that holds promises over a for-loop range. | |
## The producers which fulfill a promise updates this tree. | |
# In the threaded-case, this is easily thread-safe as only monotonic increment are used | |
# The main issue is false-sharing / cache-ping pong if many thread are fulfilling | |
# promises stored in the same cache lin, but padding would be very memory inefficient. | |
start, stop, stride: int32 | |
fullfilled: seq[int32] | |
proc newPromiseRange(start, stop, stride: int32): ProducersRangePromises = | |
assert stop > start | |
assert stride > 0 | |
new result | |
result.start = start | |
result.stop = stop | |
result.stride = stride | |
let bufLen = (result.stop - result.start + result.stride-1) div result.stride | |
result.fullfilled.newSeq(bufLen) | |
proc getInternalIndex(pr: ProducersRangePromises, iterIndex: int32): int32 {.inline.} = | |
assert iterIndex in pr.start ..< pr.stop | |
result = (iterIndex - pr.start) div pr.stride | |
proc len(pr: ProducersRangePromises): int32 {.inline.} = | |
pr.fullfilled.len.int32 | |
proc ready*(pr: ProducersRangePromises, index: int32) = | |
## requires the public iteration index in [start, stop) range. | |
assert index in pr.start ..< pr.stop | |
var idx = pr.getInternalIndex(index) | |
while true: | |
pr.fullfilled[idx] += 1 | |
idx = idx shr 1 | |
if idx == 0: | |
break | |
# Consumers' side | |
# ---------------------------------------------------- | |
type | |
ConsumerRangeDelayedTasks = object | |
## A Range-delayed task is a task that is dependent on a loop range. | |
## The consumer will schedule those tasks piece by piece as they become available. | |
## The consumer keeps track of tasks already dispatched and the ready tasks published by the producer. | |
## This is thread-local. Many consumer can depend on the same producers' promises. | |
## The last one should release ProducersPromises memory. | |
promises: ProducersRangePromises | |
dispatched: seq[int32] | |
proc newConsumerRangeDelayedTasks(pr: ProducersRangePromises): ConsumerRangeDelayedTasks = | |
result.promises = pr | |
result.dispatched.newSeq(pr.fullfilled.len) | |
proc dispatch*(cr: var ConsumerRangeDelayedTasks, internalIndex: int32) = | |
## requires the internal index in [0, dispatched.len) range. | |
var idx = internalIndex | |
while true: | |
cr.dispatched[idx] += 1 | |
idx = idx shr 1 | |
if idx == 0: | |
break | |
proc anyAvailable(cr: ConsumerRangeDelayedTasks, index: int32): bool {.inline.} = | |
let pr = cr.promises | |
if index >= pr.len: | |
return false | |
return pr.fullfilled[index] > cr.dispatched[index] | |
proc anyFulfilled*(cr: var ConsumerRangeDelayedTasks): tuple[foundNew: bool, index: int32] = | |
## This search for a fulfilled promise that wasn't already | |
## dispatched. | |
## If it finds one, it will be marked dispatched and its internal index will be returned. | |
while true: | |
# 3 cases (non-exclusive) | |
# 1. there is an availability in the left subtree | |
# 2. there is an availability in the right subtree | |
# 3. the current node is available | |
let left = 2*result.index + 1 | |
let right = left + 1 | |
if cr.anyAvailable(left): | |
result.index = left | |
elif cr.anyAvailable(right): | |
result.index = right | |
elif cr.anyAvailable(result.index): | |
# The current node is available and none of the children are | |
break | |
else: | |
assert result.foundNew == false | |
return | |
# We found an available node, update the dispatch tree | |
result.foundNew = true | |
assert result.index in 0 ..< cr.dispatched.len | |
cr.dispatch(result.index) | |
when isMainModule: | |
# block: | |
# let pr = newPromiseRange(32, 128, 32) | |
# for i in 32'i32..< 128: | |
# echo "i: ", i, ", internalIndex: ", pr.getInternalIndex(i) | |
block: | |
let pr = newPromiseRange(0, 10, 1) | |
var cr = newConsumerRangeDelayedTasks(pr) | |
echo pr[] | |
echo "Adding 7" | |
pr.ready(7) | |
echo pr[] | |
block: | |
let promise = cr.anyFulfilled() | |
echo "searching for promise: ", promise | |
block: | |
let promise = cr.anyFulfilled() | |
echo "searching for promise: ", promise | |
echo "Adding [2, 0]" | |
pr.ready(2) | |
pr.ready(0) | |
block: | |
let promise = cr.anyFulfilled() | |
echo "searching for promise: ", promise | |
echo "Adding [3, 4]" | |
pr.ready(3) | |
pr.ready(4) | |
block: | |
let promise = cr.anyFulfilled() | |
echo "searching for promise: ", promise | |
block: | |
let promise = cr.anyFulfilled() | |
echo "searching for promise: ", promise | |
block: | |
let promise = cr.anyFulfilled() | |
echo "searching for promise: ", promise | |
block: | |
let promise = cr.anyFulfilled() | |
echo "searching for promise: ", promise | |
block: | |
let promise = cr.anyFulfilled() | |
echo "searching for promise: ", promise |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment