Skip to content

Instantly share code, notes, and snippets.

@sharplet
Last active February 17, 2024 02:18
Show Gist options
  • Save sharplet/b2f65b67a3761043ba675150cad59b51 to your computer and use it in GitHub Desktop.
Save sharplet/b2f65b67a3761043ba675150cad59b51 to your computer and use it in GitHub Desktop.
PublisherQueue — Serialise the execution of multiple publishers like OperationQueue
import Combine
public enum AnyError: Error {
case never(Never)
case failure(Error)
}
public enum AnyEvent {
case output(Any)
case completion(Subscribers.Completion<AnyError>)
}
extension Publisher where Failure == Never {
public func eraseToAnyEventPublisher() -> AnyPublisher<AnyEvent, Never> {
map(AnyEvent.output)
.catch { error in Just(.completion(.failure(.never(error)))) }
.eraseToAnyPublisher()
}
}
extension Publisher {
public func eraseToAnyEventPublisher() -> AnyPublisher<AnyEvent, Never> {
map(AnyEvent.output)
.catch { error in Just(.completion(.failure(.failure(error)))) }
.eraseToAnyPublisher()
}
}
import Combine
public final class PublisherQueue {
private typealias Operation = Publishers.MakeConnectable<AnyPublisher<AnyEvent, Never>>
private let operationQueue: PassthroughSubject<Operation, Never>
private var subscriptions: Set<AnyCancellable>
public init(size: Int, maxConcurrentPublishers: Subscribers.Demand = .max(1)) {
self.operationQueue = PassthroughSubject()
self.subscriptions = []
operationQueue
.buffer(size: size, prefetch: .keepFull, whenFull: .dropNewest)
.flatMap(maxPublishers: maxConcurrentPublishers) { operation in
operation.autoconnect()
}
.sink { _ in }
.store(in: &subscriptions)
}
public func queuedPublisher<P: Publisher>(_ publisher: P) -> AnyPublisher<P.Output, P.Failure>
where P.Failure == Never
{
let operation: Operation = publisher
.eraseToAnyEventPublisher()
.makeConnectable()
return operation.flatMap { event -> AnyPublisher<P.Output, P.Failure> in
switch event {
case let .output(value):
return Result.success(value as! P.Output).publisher.eraseToAnyPublisher()
case .completion(.finished):
return Empty().eraseToAnyPublisher()
case .completion(.failure(.failure)):
fatalError("unreachable")
}
}
.handleEvents(receiveSubscription: { [operationQueue] _ in
operationQueue.send(operation)
})
.eraseToAnyPublisher()
}
public func queuedPublisher<P: Publisher>(_ publisher: P) -> AnyPublisher<P.Output, P.Failure> {
let operation: Operation = publisher
.eraseToAnyEventPublisher()
.makeConnectable()
return operation.setFailureType(to: P.Failure.self).flatMap { event -> AnyPublisher<P.Output, P.Failure> in
switch event {
case let .output(value):
return Result.success(value as! P.Output).publisher.eraseToAnyPublisher()
case .completion(.finished):
return Empty().eraseToAnyPublisher()
case let .completion(.failure(.failure(error))):
return Result.failure(error as! P.Failure).publisher.eraseToAnyPublisher()
}
}
.handleEvents(receiveSubscription: { [operationQueue] _ in
operationQueue.send(operation)
})
.eraseToAnyPublisher()
}
}
import Combine
import Dispatch
func randomInt() -> Deferred<Future<Int, Never>> {
Deferred {
Future { promise in
DispatchQueue.global().asyncAfter(deadline: .now() + 0.01) {
promise(.success(.random(in: .min ... .max)))
}
}
}
}
let q = PublisherQueue(size: .max)
var subscriptions = Set<AnyCancellable>()
var resultCount = 0
let group = DispatchGroup()
for _ in 1 ... 1000 {
group.enter()
q.queuedPublisher(randomInt()).sink(receiveCompletion: { _ in group.leave() }) { number in
resultCount += 1
}.store(in: &subscriptions)
}
group.wait()
print(resultCount, "results") // 1000 results
@sharplet
Copy link
Author

Update: Instead of eagerly subscribing to the enqueued publisher, this now happens lazily upon subscription of the returned publisher. enqueue() has been renamed to queuedPublisher() to reflect this change.

@sharplet
Copy link
Author

sharplet commented Nov 3, 2020

This is now available as a Swift package: https://github.com/sharplet/PublisherQueue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment