Last active
February 17, 2024 02:18
-
-
Save sharplet/b2f65b67a3761043ba675150cad59b51 to your computer and use it in GitHub Desktop.
PublisherQueue — Serialise the execution of multiple publishers like OperationQueue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Combine | |
public enum AnyError: Error { | |
case never(Never) | |
case failure(Error) | |
} | |
public enum AnyEvent { | |
case output(Any) | |
case completion(Subscribers.Completion<AnyError>) | |
} | |
extension Publisher where Failure == Never { | |
public func eraseToAnyEventPublisher() -> AnyPublisher<AnyEvent, Never> { | |
map(AnyEvent.output) | |
.catch { error in Just(.completion(.failure(.never(error)))) } | |
.eraseToAnyPublisher() | |
} | |
} | |
extension Publisher { | |
public func eraseToAnyEventPublisher() -> AnyPublisher<AnyEvent, Never> { | |
map(AnyEvent.output) | |
.catch { error in Just(.completion(.failure(.failure(error)))) } | |
.eraseToAnyPublisher() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Combine | |
public final class PublisherQueue { | |
private typealias Operation = Publishers.MakeConnectable<AnyPublisher<AnyEvent, Never>> | |
private let operationQueue: PassthroughSubject<Operation, Never> | |
private var subscriptions: Set<AnyCancellable> | |
public init(size: Int, maxConcurrentPublishers: Subscribers.Demand = .max(1)) { | |
self.operationQueue = PassthroughSubject() | |
self.subscriptions = [] | |
operationQueue | |
.buffer(size: size, prefetch: .keepFull, whenFull: .dropNewest) | |
.flatMap(maxPublishers: maxConcurrentPublishers) { operation in | |
operation.autoconnect() | |
} | |
.sink { _ in } | |
.store(in: &subscriptions) | |
} | |
public func queuedPublisher<P: Publisher>(_ publisher: P) -> AnyPublisher<P.Output, P.Failure> | |
where P.Failure == Never | |
{ | |
let operation: Operation = publisher | |
.eraseToAnyEventPublisher() | |
.makeConnectable() | |
return operation.flatMap { event -> AnyPublisher<P.Output, P.Failure> in | |
switch event { | |
case let .output(value): | |
return Result.success(value as! P.Output).publisher.eraseToAnyPublisher() | |
case .completion(.finished): | |
return Empty().eraseToAnyPublisher() | |
case .completion(.failure(.failure)): | |
fatalError("unreachable") | |
} | |
} | |
.handleEvents(receiveSubscription: { [operationQueue] _ in | |
operationQueue.send(operation) | |
}) | |
.eraseToAnyPublisher() | |
} | |
public func queuedPublisher<P: Publisher>(_ publisher: P) -> AnyPublisher<P.Output, P.Failure> { | |
let operation: Operation = publisher | |
.eraseToAnyEventPublisher() | |
.makeConnectable() | |
return operation.setFailureType(to: P.Failure.self).flatMap { event -> AnyPublisher<P.Output, P.Failure> in | |
switch event { | |
case let .output(value): | |
return Result.success(value as! P.Output).publisher.eraseToAnyPublisher() | |
case .completion(.finished): | |
return Empty().eraseToAnyPublisher() | |
case let .completion(.failure(.failure(error))): | |
return Result.failure(error as! P.Failure).publisher.eraseToAnyPublisher() | |
} | |
} | |
.handleEvents(receiveSubscription: { [operationQueue] _ in | |
operationQueue.send(operation) | |
}) | |
.eraseToAnyPublisher() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Combine | |
import Dispatch | |
func randomInt() -> Deferred<Future<Int, Never>> { | |
Deferred { | |
Future { promise in | |
DispatchQueue.global().asyncAfter(deadline: .now() + 0.01) { | |
promise(.success(.random(in: .min ... .max))) | |
} | |
} | |
} | |
} | |
let q = PublisherQueue(size: .max) | |
var subscriptions = Set<AnyCancellable>() | |
var resultCount = 0 | |
let group = DispatchGroup() | |
for _ in 1 ... 1000 { | |
group.enter() | |
q.queuedPublisher(randomInt()).sink(receiveCompletion: { _ in group.leave() }) { number in | |
resultCount += 1 | |
}.store(in: &subscriptions) | |
} | |
group.wait() | |
print(resultCount, "results") // 1000 results |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Update: Instead of eagerly subscribing to the enqueued publisher, this now happens lazily upon subscription of the returned publisher.
enqueue()
has been renamed toqueuedPublisher()
to reflect this change.