Last active
April 23, 2023 12:48
-
-
Save tcldr/afa8eccb5119027c3a5327c84e52d7f3 to your computer and use it in GitHub Desktop.
Swift Concurrency Utilities for managing Transactional Resources
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// A simple serial queue. Every operation sent through the queue will be executed | |
// in first-in first-out order. Every operation will complete its execution fully, | |
// even if it includes suspension points, prior to the next operation commencing. | |
// | |
// TIP: If calling from a global actor like @MainActor annotate your closure to | |
// get better ergonomics: `queue.send { @MainActor in /* ... */ }` | |
public final class AsyncSerialQueue: Sendable { | |
public typealias Operation = @Sendable () async -> Void | |
public typealias Continuation = AsyncStream<Operation>.Continuation | |
public typealias YieldResult = Continuation.YieldResult | |
private let continuation: AsyncStream<Operation>.Continuation | |
private let task: Task<Void, Error> | |
public init(bufferingPolicy: Continuation.BufferingPolicy = .unbounded) { | |
let (operations, continuation) = AsyncStream.withExtractedContinuation( | |
Operation.self, bufferingPolicy: bufferingPolicy) | |
self.continuation = continuation | |
self.task = Task { | |
for await operation in operations { | |
try Task.checkCancellation() | |
await operation() | |
} | |
} | |
} | |
deinit { | |
// Cancels the Task prior to commencement of next operation. | |
task.cancel() | |
// In the case that the operation queue has no pending operations, checkCancellation | |
// won't be called. Send through an empty operation to handle this case. | |
continuation.yield { } | |
} | |
@discardableResult | |
public func send(_ operation: @escaping Operation) -> YieldResult { | |
continuation.yield(operation) | |
} | |
} | |
// An AsyncReactor. A reactor can be used to protect a transactional resource where | |
// every operation must be carried out in sequence. The send closure includes a | |
// parameter for the protected resource through which operations on the protected | |
// resource can be performed. | |
// | |
// TIP: If calling from a global actor like @MainActor annotate your closure to | |
// get better ergonomics: `reactor.send { @MainActor resource in /* ... */ }` | |
public final class AsyncReactor<Resource: Sendable>: Sendable { | |
public typealias Operation = @Sendable (Resource) async -> Void | |
public typealias Continuation = AsyncStream<Operation>.Continuation | |
public typealias YieldResult = Continuation.YieldResult | |
private let continuation: AsyncStream<Operation>.Continuation | |
private let task: Task<Void, Error> | |
public init(resource: Resource, bufferingPolicy: Continuation.BufferingPolicy = .unbounded) { | |
let (operations, continuation) = AsyncStream.withExtractedContinuation( | |
Operation.self, bufferingPolicy: bufferingPolicy) | |
self.continuation = continuation | |
self.task = Task { | |
for await operation in operations { | |
try Task.checkCancellation() | |
await operation(resource) | |
} | |
} | |
} | |
deinit { | |
// Cancels the Task prior to commencement of next operation. | |
task.cancel() | |
// In the case that the operation queue has no pending operations, checkCancellation | |
// won't be called. Send through an empty operation to handle this case. | |
continuation.yield { _ in } | |
} | |
@discardableResult | |
public func send(_ operation: @escaping Operation) -> YieldResult { | |
continuation.yield(operation) | |
} | |
} | |
// AsyncStream addition that acts as the basis for the prebious utilities. It breaks | |
// apart an AsyncStream from its continuation as a way to send items in series via | |
// a non-async, non-blocking call from outside the initialiser of an AsyncStream. | |
public extension AsyncStream { | |
static func withExtractedContinuation( | |
_ type: Element.Type, | |
bufferingPolicy: AsyncStream<Element>.Continuation.BufferingPolicy | |
) -> (AsyncStream<Element>, AsyncStream<Element>.Continuation) { | |
var extractedContinuation: AsyncStream<Element>.Continuation! = nil | |
let stream = AsyncStream(Element.self, bufferingPolicy: bufferingPolicy) { continuation in | |
extractedContinuation = continuation | |
} | |
return (stream, extractedContinuation) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment