Last active
August 11, 2024 00:21
-
-
Save ole/b7872efc1824bfdc6da1e414d957b33f to your computer and use it in GitHub Desktop.
Using CheckedContinuation to communicate between two concurrent tasks.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import _Concurrency | |
actor Channel<Output> { | |
private var conditionVar: CheckedContinuation<Output, Never>? = nil | |
deinit { | |
// TODO: if conditionVar != nil, resume it by throwing `CancellationError()`? | |
} | |
/// If there's no receiver, the sent value will be lost. | |
func send(_ value: Output) { | |
conditionVar?.resume(returning: value) | |
conditionVar = nil | |
} | |
/// - Precondition: Only one `receive` may be active at a time. | |
/// Calling `send` will reset the receive state. | |
func receive() async -> Output { | |
precondition(conditionVar == nil, "Called receive() a second time while the first is still active") | |
return await withCheckedContinuation { cont in | |
conditionVar = cont | |
} | |
} | |
} | |
actor Buffer<Element> { | |
let bufferSize: Int | |
private var elements: [Element] | |
private let isFullSignal: Channel<Void> = .init() | |
private let hasElementsSignal: Channel<Void> = .init() | |
init(size: Int) { | |
self.bufferSize = size | |
var elements: [Element] = [] | |
elements.reserveCapacity(size) | |
self.elements = elements | |
} | |
func append(_ value: Element) async { | |
while elements.count >= bufferSize { | |
// Wait until there's room in the buffer. | |
print("producer waiting for buffer to empty") | |
await isFullSignal.receive() | |
} | |
elements.append(value) | |
await hasElementsSignal.send(()) | |
} | |
func next() async -> Element? { | |
while true { | |
if !elements.isEmpty { | |
let e = elements.removeFirst() | |
await isFullSignal.send(()) | |
return e | |
} else { | |
// Wait until there's a new element in the buffer. | |
print("consumer waiting for buffer to fill") | |
await hasElementsSignal.receive() | |
} | |
} | |
} | |
} | |
Task { | |
let buffer = Buffer<Int>(size: 2) | |
// Consumer task | |
Task { | |
while let next = await buffer.next() { | |
print(next) | |
try! await Task.sleep(nanoseconds: 100_000_000) | |
} | |
} | |
// Producer task | |
Task { | |
await buffer.append(1) | |
try! await Task.sleep(nanoseconds: 400_000_000) | |
await buffer.append(2) | |
await buffer.append(3) | |
await buffer.append(4) | |
await buffer.append(5) | |
} | |
} | |
import PlaygroundSupport | |
PlaygroundPage.current.needsIndefiniteExecution = true |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment