Last active
August 2, 2024 11:29
-
-
Save mattmassicotte/0c5a703c0b7600ecad99b41eea215fe8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import AsyncAlgorithms | |
enum Event<Element: Sendable> { | |
case element(Element) | |
case timeout | |
} | |
struct TimeoutAsyncSequence<Base, C: Clock>: AsyncSequence where Base : AsyncSequence, Base.Element : Sendable, Base: Sendable { | |
let base: Base | |
let interval: C.Instant.Duration | |
let tolerance: C.Instant.Duration? | |
let clock: C | |
init( | |
base: Base, | |
for interval: C.Instant.Duration, | |
tolerance: C.Instant.Duration? = nil, | |
clock: C | |
) { | |
self.base = base | |
self.interval = interval | |
self.tolerance = tolerance | |
self.clock = clock | |
} | |
struct TimeoutAsyncIterator: AsyncIteratorProtocol { | |
// 😵 | |
var mergedIterator: AsyncMerge2Sequence<AsyncMapSequence<Base, Event<Base.Element>>, AsyncMapSequence<AsyncTimerSequence<C>, Event<TimeoutAsyncSequence<Base, C>.TimeoutAsyncIterator.Element>>>.AsyncIterator | |
init(sequence: TimeoutAsyncSequence) { | |
let timer = AsyncTimerSequence(interval: sequence.interval, tolerance: sequence.tolerance, clock: sequence.clock) | |
.map { _ in Event<Element>.timeout } | |
let elements = sequence.base.map { Event<Element>.element($0) } | |
let mergedSequence = merge(elements, timer) | |
self.mergedIterator = mergedSequence.makeAsyncIterator() | |
} | |
mutating func next() async throws -> Base.Element? { | |
switch try await mergedIterator.next() { | |
case let .element(value): | |
return value | |
case .timeout, nil: | |
return nil | |
} | |
} | |
} | |
func makeAsyncIterator() -> some AsyncIteratorProtocol { | |
TimeoutAsyncIterator(sequence: self) | |
} | |
} | |
extension AsyncSequence where Self: Sendable, Element: Sendable { | |
func withTimeout<C: Clock>( | |
for interval: C.Instant.Duration, | |
tolerance: C.Instant.Duration? = nil, | |
clock: C | |
) -> TimeoutAsyncSequence<Self, C> { | |
TimeoutAsyncSequence(base: self, for: interval, tolerance: tolerance, clock: clock) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment