Skip to content

Instantly share code, notes, and snippets.

@yannxou
Created November 28, 2024 14:38
Show Gist options
  • Save yannxou/eed54b2c76e548c171a43c37da8ff567 to your computer and use it in GitHub Desktop.
Save yannxou/eed54b2c76e548c171a43c37da8ff567 to your computer and use it in GitHub Desktop.
Publisher+expect
import Combine
import Foundation
enum PublisherExpectationError: Error {
case failedWithError(Error)
case finishedWithoutMatchingValue
case timedOut
}
public extension Publisher {
/// Awaits for the first value emitted by this publisher that satifsfies the given condition within a certain time. If no matching value is emitted within the given `timeout` an error is thrown.
/// - Parameters:
/// - condition: The condition on the emitted value to be satisfied.
/// - action: The initial action that is expected to make this publisher emit a valid value.
/// - timeout: Maximum time to wait for the first matching value to be emitted.
/// - Returns: The first matching value emitted by the publisher.
func asyncFirst(where condition: @escaping (Output) -> Bool, on action: @escaping () async -> Void, timeout: TimeInterval) async throws -> Output {
try await withCheckedThrowingContinuation { continuation in
var cancellable: AnyCancellable?
var success = false
let startDate = Date()
cancellable = self
.handleEvents(receiveSubscription: { _ in
Task {
await action()
}
})
.timeout(.seconds(timeout), scheduler: DispatchQueue.main)
.sink(
receiveCompletion: { completion in
if case let .failure(error) = completion {
continuation.resume(throwing: PublisherExpectationError.failedWithError(error))
} else if case .finished = completion, !success {
if startDate.addingTimeInterval(timeout) < Date() {
continuation.resume(throwing: PublisherExpectationError.timedOut)
} else {
continuation.resume(throwing: PublisherExpectationError.finishedWithoutMatchingValue)
}
}
cancellable?.cancel()
},
receiveValue: { value in
if condition(value) {
success = true
continuation.resume(with: .success(value))
cancellable?.cancel()
}
}
)
}
}
}
public extension Publisher where Output: Equatable {
/// Awaits for the first value emitted by this publisher that is equal to the given value. If no matching value is emitted within the given `timeout` an error is thrown.
/// - Parameters:
/// - value: The value expected to be emitted.
/// - action: The initial action that is expected to make this publisher emit a valid value.
/// - timeout: Maximum time to wait for the value to be emitted.
/// - Returns: The first matching value emitted by the publisher.
func asyncFirst(equalTo value: Output, on action: @escaping () async -> Void, timeout: TimeInterval) async throws -> Output {
try await asyncFirst(where: { $0 == value }, on: action, timeout: timeout)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment