Created
November 28, 2024 14:38
-
-
Save yannxou/eed54b2c76e548c171a43c37da8ff567 to your computer and use it in GitHub Desktop.
Publisher+expect
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Combine | |
import Foundation | |
enum PublisherExpectationError: Error { | |
case failedWithError(Error) | |
case finishedWithoutMatchingValue | |
case timedOut | |
} | |
public extension Publisher { | |
/// Awaits for the first value emitted by this publisher that satifsfies the given condition within a certain time. If no matching value is emitted within the given `timeout` an error is thrown. | |
/// - Parameters: | |
/// - condition: The condition on the emitted value to be satisfied. | |
/// - action: The initial action that is expected to make this publisher emit a valid value. | |
/// - timeout: Maximum time to wait for the first matching value to be emitted. | |
/// - Returns: The first matching value emitted by the publisher. | |
func asyncFirst(where condition: @escaping (Output) -> Bool, on action: @escaping () async -> Void, timeout: TimeInterval) async throws -> Output { | |
try await withCheckedThrowingContinuation { continuation in | |
var cancellable: AnyCancellable? | |
var success = false | |
let startDate = Date() | |
cancellable = self | |
.handleEvents(receiveSubscription: { _ in | |
Task { | |
await action() | |
} | |
}) | |
.timeout(.seconds(timeout), scheduler: DispatchQueue.main) | |
.sink( | |
receiveCompletion: { completion in | |
if case let .failure(error) = completion { | |
continuation.resume(throwing: PublisherExpectationError.failedWithError(error)) | |
} else if case .finished = completion, !success { | |
if startDate.addingTimeInterval(timeout) < Date() { | |
continuation.resume(throwing: PublisherExpectationError.timedOut) | |
} else { | |
continuation.resume(throwing: PublisherExpectationError.finishedWithoutMatchingValue) | |
} | |
} | |
cancellable?.cancel() | |
}, | |
receiveValue: { value in | |
if condition(value) { | |
success = true | |
continuation.resume(with: .success(value)) | |
cancellable?.cancel() | |
} | |
} | |
) | |
} | |
} | |
} | |
public extension Publisher where Output: Equatable { | |
/// Awaits for the first value emitted by this publisher that is equal to the given value. If no matching value is emitted within the given `timeout` an error is thrown. | |
/// - Parameters: | |
/// - value: The value expected to be emitted. | |
/// - action: The initial action that is expected to make this publisher emit a valid value. | |
/// - timeout: Maximum time to wait for the value to be emitted. | |
/// - Returns: The first matching value emitted by the publisher. | |
func asyncFirst(equalTo value: Output, on action: @escaping () async -> Void, timeout: TimeInterval) async throws -> Output { | |
try await asyncFirst(where: { $0 == value }, on: action, timeout: timeout) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment