Last active
August 2, 2024 11:00
-
-
Save fruitcoder/b8d8755e1e82a5d1b991d8f28a156610 to your computer and use it in GitHub Desktop.
Trying to add a timeout functionality to AsyncStream fails because AsyncStream isn't Sendable
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extension AsyncSequence where Self.Element : Equatable & Sendable { | |
func first( | |
_ search: Self.Element, | |
timeout: TimeInterval = 1.0 | |
) async throws { | |
do { | |
try await _until(where: { $0 == search }, timeout: timeout) | |
} catch is CancellationError { | |
print("cancelled before element was emitted or timeout was reached.") | |
throw CancellationError() | |
} catch is TimeoutError { | |
print("stream didn't emit \(search) in within timeout (\(timeout)).") | |
throw TimeoutError() | |
} catch { | |
print("stream throwed an error within timeout (\(timeout)).") | |
throw error | |
} | |
} | |
private func _until(where predicate: @Sendable @escaping (Self.Element) async throws -> Bool, timeout: TimeInterval) async throws { | |
try await withTimeout(timeout) { | |
_ = try await first(where: predicate) // error: Capture of 'self' with non-sendable type 'Self' in a `@Sendable` closure | |
} | |
} | |
} | |
func firstOf<R: Sendable>( | |
_ f1: @Sendable @escaping () async throws -> R, | |
or f2: @Sendable @escaping () async throws -> R | |
) async throws -> R { | |
try Task.checkCancellation() | |
return try await withThrowingTaskGroup(of: R.self) { group in | |
try Task.checkCancellation() | |
guard group.addTaskUnlessCancelled(operation: { try await f1() }) else { | |
throw CancellationError() | |
} | |
guard group.addTaskUnlessCancelled(operation: { try await f2() }) else { | |
group.cancelAll() | |
throw CancellationError() | |
} | |
guard let first = try await group.next() else { | |
fatalError() | |
} | |
group.cancelAll() | |
return first | |
} | |
} | |
func withTimeout<R: Sendable>( | |
_ seconds: Double, | |
_ work: @Sendable @escaping () async throws -> R | |
) async throws -> R { | |
try await firstOf { | |
try await work() | |
} or: { | |
try? await Task.sleep(nanoseconds: UInt64(seconds * 1_000_000_000)) | |
throw TimeoutError() | |
} | |
} | |
struct TimeoutError: Error {} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment