Last active
August 1, 2024 21:03
-
-
Save mattmassicotte/bde7a4931a33a36361df0e068a41326a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Foundation | |
struct TimeoutError: Error { | |
} | |
extension AsyncThrowingStream.Continuation where Failure == Error { | |
func timeoutTask(with interval: TimeInterval) -> Task<Void, Error> { | |
let sleepNS = UInt64(interval * 1_000_000_000) | |
return Task { | |
// this will throw on cancel | |
try await Task.sleep(nanoseconds: sleepNS) | |
self.finish(throwing: TimeoutError()) | |
} | |
} | |
} | |
extension AsyncThrowingStream where Failure == Error, Element: Sendable { | |
func timeout(_ interval: TimeInterval) -> Self { | |
return Self { continuation in | |
Task { | |
do { | |
var timeout = continuation.timeoutTask(with: interval) | |
for try await value in self { | |
timeout.cancel() | |
continuation.yield(value) | |
timeout = continuation.timeoutTask(with: interval) | |
} | |
continuation.finish() | |
} catch { | |
continuation.finish(throwing: error) | |
} | |
} | |
} | |
} | |
} |
@jdanthinne yeah wasn't quite right. I've update it with Donny's approach, which I like more. It is still essential that Element
conform to Sendable
in this version.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@mattmassicotte shouldn't be the code inside the main
Task
wrapped in atry…catch
in order to catch the original stream errors and forward them tocontinuation.finish(throwing: error)
?