Skip to content

Instantly share code, notes, and snippets.

@rlaguilar
Last active October 3, 2024 03:31
Show Gist options
  • Save rlaguilar/984bf196eb13a783617e7b1a2a6edee2 to your computer and use it in GitHub Desktop.
Save rlaguilar/984bf196eb13a783617e7b1a2a6edee2 to your computer and use it in GitHub Desktop.
Producer consumer pattern using Swift concurrency
import Foundation
actor Buffer<Element> {
private let emptySlots = Stream<Void>()
private let availableElements = Stream<Element>()
init(capacity: Int) {
for _ in 0 ..< capacity {
emptySlots.push(())
}
}
func push(_ element: Element) async -> Bool {
if let _ = await emptySlots.next() {
availableElements.push(element)
return true
} else {
return false
}
}
func next() async -> Element? {
if let element = await availableElements.next() {
emptySlots.push(())
return element
} else {
return nil
}
}
func finish() {
emptySlots.finish()
availableElements.finish()
}
// Wrapper around AsyncStream that provides an easier to use API
private class Stream<Value> {
private var continuation: AsyncStream<Value>.Continuation!
private var iterator: AsyncStream<Value>.Iterator!
init() {
let stream = AsyncStream(Value.self) { continuation in
self.continuation = continuation
}
iterator = stream.makeAsyncIterator()
}
func next() async -> Value? {
return await iterator.next()
}
func push(_ element: Value) {
continuation.yield(element)
}
func finish() {
continuation.finish()
}
}
}
import Foundation
let startTime = Date()
func time() -> TimeInterval {
Date().timeIntervalSince(startTime)
}
enum Speed {
case instant, fast, slow
var delay: UInt64 {
switch self {
case .instant:
return 0
case .fast:
return 1
case .slow:
return 3
}
}
}
func run(producerSpeed: Speed, consumerSpeed: Speed) {
let buffer = Buffer<Int>(capacity: 5)
// producer
Task {
for i in 0 ..< 10 {
guard await buffer.push(i) else {
break
}
print("\(time()): Enqueued \(i)")
try! await Task.sleep(nanoseconds: producerSpeed.delay * 1_000_000_000)
}
await buffer.finish()
print("Done producing")
}
// consumer
Task {
while let item = await buffer.next() {
print("\(time()): Received \(item)")
try! await Task.sleep(nanoseconds: consumerSpeed.delay * 1_000_000_000)
}
print("Done consuming")
}
DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(5)) {
Task {
await buffer.finish()
print("Buffer finished")
}
}
}
run(producerSpeed: .instant, consumerSpeed: .fast)
RunLoop.main.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment