Last active
July 4, 2023 11:30
-
-
Save arnetheduck/b6a7ac8f4b85490d26d464674e09d57d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import chronos, chronos/threadsync | |
import stew/ptrops | |
import std/locks | |
import std/typetraits | |
const Numbers = 1000 | |
const Threads = 100 | |
type | |
Node = object | |
next: ptr Node | |
data: UncheckedArray[byte] | |
LockingList = object | |
head, tail: ptr Node | |
lock: Lock | |
LockingChannel[T] = object | |
list: LockingList | |
sig: ThreadSignalPtr | |
total: int | |
template withLock(t, x: untyped) = | |
acquire(t.lock) | |
x | |
release(t.lock) | |
proc addNode(c: var LockingList, node: ptr Node): bool = | |
withLock(c.lock): | |
if c.head == nil: | |
c.head = node | |
c.tail = node | |
result = true | |
else: | |
c.head.next = node | |
c.head = node | |
result = false | |
proc popNode(c: var LockingList): tuple[node: ptr Node, last: bool] = | |
withLock(c.lock): | |
assert c.tail != nil | |
result[0] = c.tail | |
c.tail = c.tail.next | |
if c.tail == nil: | |
c.head = nil | |
result[1] = c.tail == nil | |
proc addTrivial[T](c: var LockingList, v: T): bool = | |
static: doAssert supportsCopyMem(T) | |
let | |
node = cast[ptr Node](allocShared0(sizeof(Node) + sizeof(T))) | |
copyMem(addr node.data[0], unsafeAddr v, sizeof(T)) | |
c.addNode(node) | |
proc popTrivial(c: var LockingList, T: type): tuple[data: T, last: bool] {.noinit.} = | |
let (node, last) = c.popNode() | |
copyMem(addr result[0], addr node[].data[0], sizeof(T)) | |
freeShared(node) | |
result[1] = last | |
proc addBytes(c: var LockingList, v: openArray[byte]): bool = | |
let | |
len = v.len | |
node = cast[ptr Node](allocShared0(sizeof(Node) + sizeof(int) + len)) | |
copyMem(addr node.data[0], unsafeAddr len, sizeof(len)) | |
copyMem(addr node.data[sizeof(int)], baseAddr v, v.len) | |
c.addNode(node) | |
proc popBytes(c: var LockingList): tuple[data: seq[byte], last: bool] = | |
let (node, last) = c.popNode() | |
var len: int | |
copyMem(addr len, addr node[].data[0], sizeof(int)) | |
result[0].setLen(len) | |
if len > 0: | |
copyMem(addr result[0][0], addr node[].data[0], len) | |
freeShared(node) | |
result[1] = last | |
proc add(c: var LockingChannel[seq[byte]], v: openArray[byte]) = | |
if c.list.addBytes(v): | |
discard c.sig.fireSync().expect("working send") | |
proc pop(c: ptr LockingChannel[seq[byte]]): Future[seq[byte]] {.async.} = | |
await threadsync.wait(c.sig) | |
let (data, last) = c.list.popBytes() | |
if not last: | |
discard c.sig.fire() | |
data | |
proc add[T](c: var LockingChannel[T], v: T) = | |
if c.list.addTrivial(v): | |
discard c.sig.fireSync().expect("working send") | |
proc pop[T](c: ptr LockingChannel[T]): Future[T] {.async.} = | |
await threadsync.wait(c.sig) | |
let (data, last) = c.list.popTrivial(T) | |
if not last: | |
await c.sig.fire() | |
data | |
proc consumer(c: ptr LockingChannel[int]) {.async.} = | |
for i in 0..<Numbers * Threads: | |
c[].total += await c.pop() | |
proc consumerThread(c: ptr LockingChannel[int]) {.thread.} = | |
waitFor consumer(c) | |
import os | |
proc producerThread(c: ptr LockingChannel[int]) {.thread.} = | |
for i in 0..<Numbers div 2: | |
c[].add(i) | |
sleep(1000) | |
for i in Numbers div 2..<Numbers: | |
c[].add(i) | |
var producers, consumers: array[Threads, Thread[ptr LockingChannel[int]]] | |
var channel: LockingChannel[int] | |
channel.list.lock.initLock() | |
channel.sig = ThreadSignalPtr.new()[] | |
for i in 0..<Threads: | |
createThread(producers[i], producerThread, addr channel) | |
createThread(consumers[0], consumerThread, addr channel) | |
joinThreads(producers) | |
joinThreads(consumers.toOpenArray(0, 0)) | |
doAssert channel.total == ((Numbers * (Numbers-1)) div 2) * Threads |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import chronos, taskpools, taskpools/channels_spsc_single, chronos/threadsync | |
import os | |
type | |
ThreadSignalTask[T] = object | |
chan: ChannelSPSCSingle[T] | |
sig: ThreadSignalPtr | |
proc someTask(task: ptr ThreadSignalTask[int], v: int) = | |
echo "starting sleep ", v | |
sleep(v) | |
discard task.chan.trySend(v) | |
discard task.sig.fireSync() | |
proc consumer(task: ptr ThreadSignalTask[int]): Future[int] {.async.} = | |
await task[].sig.wait() | |
discard task.chan.tryRecv(result) | |
task[].sig.close() | |
deallocShared(task) | |
proc main() = | |
let tp = Taskpool.new() | |
var v: seq[Future[int]] | |
for i in 0..10: | |
let task = ThreadSignalTask[int].createShared(1) | |
task[].sig = ThreadSignalPtr.new().expect("can create") | |
tp.spawn someTask(task, i * 10) | |
v.add consumer(task) | |
while v.len > 0: | |
let taskFut = v.pop() | |
echo waitFor(taskFut) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment