|
extension Result where Success: Sendable, Failure == Error { |
|
|
|
actor SharedStore { |
|
|
|
typealias ID = UInt |
|
|
|
private let source: AsyncStream<Result> |
|
|
|
private let cancellingGracePeriod: Duration |
|
private let clock: ContinuousClock |
|
|
|
private var _task: Task<Void, Never>? |
|
private var _value: Result? |
|
|
|
private var continuations: [ID: AsyncStream<Result>.Continuation] = [:] |
|
private var count: ID = 0 |
|
private var referenceCount: ID = 0 |
|
|
|
private var cancelTimestamp: ContinuousClock.Instant? |
|
|
|
init( |
|
source: AsyncStream<Result>, |
|
cancellingGracePeriod: Duration = .seconds(60), |
|
clock: ContinuousClock = .continuous |
|
) { |
|
self.source = source |
|
self.cancellingGracePeriod = cancellingGracePeriod |
|
self.clock = clock |
|
} |
|
|
|
func get() async -> Result { |
|
if let value = _value { |
|
return value |
|
} else { |
|
return await streamValueAndReturnFirst() |
|
} |
|
} |
|
|
|
func invalidate() { |
|
_task = nil |
|
_value = nil |
|
|
|
if !continuations.isEmpty { |
|
Task { await streamValueAndReturnFirst() } |
|
} |
|
} |
|
} |
|
} |
|
|
|
extension Result.SharedStore { |
|
|
|
typealias BufferingPolicy = AsyncStream<Result>.Continuation.BufferingPolicy |
|
|
|
func stream(bufferingPolicy: BufferingPolicy = .bufferingNewest(1)) -> AsyncStream<Result> { |
|
AsyncStream(bufferingPolicy: bufferingPolicy) { continuation in |
|
insert(continuation) |
|
} |
|
} |
|
|
|
@discardableResult |
|
private func streamValueAndReturnFirst() async -> Result { |
|
var _continuation: CheckedContinuation<Result, Never>! |
|
async let value = withCheckedContinuation { _continuation = $0 } |
|
_task = Task<Void, Never> { |
|
for await value in source { |
|
let value = yield(value) |
|
_value = value |
|
if let continuation = _continuation { |
|
continuation.resume(returning: value) |
|
_continuation = nil |
|
} |
|
} |
|
} |
|
return await value |
|
} |
|
|
|
private func insert(_ continuation: AsyncStream<Result>.Continuation) { |
|
var _task: Task<Void, Never>? |
|
if let value = _value { |
|
continuation.yield(value) |
|
} else { |
|
_task = Task { await streamValueAndReturnFirst() } |
|
} |
|
count(change: +1) |
|
let id = count + 1 |
|
count = id |
|
continuations[id] = continuation |
|
continuation.onTermination = { @Sendable [weak self, _task] _ in |
|
_task?.cancel() |
|
Task { [weak self] in |
|
await self?.remove(continuation: id) |
|
await self?.count(change: -1) |
|
} |
|
} |
|
} |
|
|
|
private func yield(_ result: Result) -> Result { |
|
for (_, continuation) in continuations { |
|
continuation.yield(result) |
|
} |
|
return result |
|
} |
|
|
|
private func count(change: Int) { |
|
let result = Int(referenceCount) + change |
|
if result > 0 { |
|
referenceCount = UInt(result) |
|
cancelTimestamp = nil |
|
} else if cancellingGracePeriod > .zero { |
|
cancelTimestamp = clock.now |
|
Task { [weak self, clock, t = cancelTimestamp, sleep = cancellingGracePeriod] in |
|
try await Task.sleep(for: sleep, clock: clock) |
|
guard await self?.cancelTimestamp == t else { return } |
|
await self?.cancel() |
|
} |
|
} else { |
|
cancel() |
|
} |
|
} |
|
|
|
private func cancel() { |
|
referenceCount = 0 |
|
invalidate() |
|
} |
|
|
|
private func remove(continuation id: ID) { |
|
continuations.removeValue(forKey: id) |
|
} |
|
|
|
enum Error: Swift.Error { |
|
case failedToPopulateCache |
|
} |
|
} |