Last active
October 3, 2024 03:31
-
-
Save rlaguilar/984bf196eb13a783617e7b1a2a6edee2 to your computer and use it in GitHub Desktop.
Producer consumer pattern using Swift concurrency
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Foundation | |
actor Buffer<Element> { | |
private let emptySlots = Stream<Void>() | |
private let availableElements = Stream<Element>() | |
init(capacity: Int) { | |
for _ in 0 ..< capacity { | |
emptySlots.push(()) | |
} | |
} | |
func push(_ element: Element) async -> Bool { | |
if let _ = await emptySlots.next() { | |
availableElements.push(element) | |
return true | |
} else { | |
return false | |
} | |
} | |
func next() async -> Element? { | |
if let element = await availableElements.next() { | |
emptySlots.push(()) | |
return element | |
} else { | |
return nil | |
} | |
} | |
func finish() { | |
emptySlots.finish() | |
availableElements.finish() | |
} | |
// Wrapper around AsyncStream that provides an easier to use API | |
private class Stream<Value> { | |
private var continuation: AsyncStream<Value>.Continuation! | |
private var iterator: AsyncStream<Value>.Iterator! | |
init() { | |
let stream = AsyncStream(Value.self) { continuation in | |
self.continuation = continuation | |
} | |
iterator = stream.makeAsyncIterator() | |
} | |
func next() async -> Value? { | |
return await iterator.next() | |
} | |
func push(_ element: Value) { | |
continuation.yield(element) | |
} | |
func finish() { | |
continuation.finish() | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Foundation | |
let startTime = Date() | |
func time() -> TimeInterval { | |
Date().timeIntervalSince(startTime) | |
} | |
enum Speed { | |
case instant, fast, slow | |
var delay: UInt64 { | |
switch self { | |
case .instant: | |
return 0 | |
case .fast: | |
return 1 | |
case .slow: | |
return 3 | |
} | |
} | |
} | |
func run(producerSpeed: Speed, consumerSpeed: Speed) { | |
let buffer = Buffer<Int>(capacity: 5) | |
// producer | |
Task { | |
for i in 0 ..< 10 { | |
guard await buffer.push(i) else { | |
break | |
} | |
print("\(time()): Enqueued \(i)") | |
try! await Task.sleep(nanoseconds: producerSpeed.delay * 1_000_000_000) | |
} | |
await buffer.finish() | |
print("Done producing") | |
} | |
// consumer | |
Task { | |
while let item = await buffer.next() { | |
print("\(time()): Received \(item)") | |
try! await Task.sleep(nanoseconds: consumerSpeed.delay * 1_000_000_000) | |
} | |
print("Done consuming") | |
} | |
DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(5)) { | |
Task { | |
await buffer.finish() | |
print("Buffer finished") | |
} | |
} | |
} | |
run(producerSpeed: .instant, consumerSpeed: .fast) | |
RunLoop.main.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment