Skip to content

Instantly share code, notes, and snippets.

@fruitcoder
Last active August 2, 2024 11:00
Show Gist options
  • Save fruitcoder/b8d8755e1e82a5d1b991d8f28a156610 to your computer and use it in GitHub Desktop.
Save fruitcoder/b8d8755e1e82a5d1b991d8f28a156610 to your computer and use it in GitHub Desktop.
Trying to add a timeout functionality to AsyncStream fails because AsyncStream isn't Sendable
extension AsyncSequence where Self.Element : Equatable & Sendable {
func first(
_ search: Self.Element,
timeout: TimeInterval = 1.0
) async throws {
do {
try await _until(where: { $0 == search }, timeout: timeout)
} catch is CancellationError {
print("cancelled before element was emitted or timeout was reached.")
throw CancellationError()
} catch is TimeoutError {
print("stream didn't emit \(search) in within timeout (\(timeout)).")
throw TimeoutError()
} catch {
print("stream throwed an error within timeout (\(timeout)).")
throw error
}
}
private func _until(where predicate: @Sendable @escaping (Self.Element) async throws -> Bool, timeout: TimeInterval) async throws {
try await withTimeout(timeout) {
_ = try await first(where: predicate) // error: Capture of 'self' with non-sendable type 'Self' in a `@Sendable` closure
}
}
}
func firstOf<R: Sendable>(
_ f1: @Sendable @escaping () async throws -> R,
or f2: @Sendable @escaping () async throws -> R
) async throws -> R {
try Task.checkCancellation()
return try await withThrowingTaskGroup(of: R.self) { group in
try Task.checkCancellation()
guard group.addTaskUnlessCancelled(operation: { try await f1() }) else {
throw CancellationError()
}
guard group.addTaskUnlessCancelled(operation: { try await f2() }) else {
group.cancelAll()
throw CancellationError()
}
guard let first = try await group.next() else {
fatalError()
}
group.cancelAll()
return first
}
}
func withTimeout<R: Sendable>(
_ seconds: Double,
_ work: @Sendable @escaping () async throws -> R
) async throws -> R {
try await firstOf {
try await work()
} or: {
try? await Task.sleep(nanoseconds: UInt64(seconds * 1_000_000_000))
throw TimeoutError()
}
}
struct TimeoutError: Error {}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment