Skip to content

Instantly share code, notes, and snippets.

@ollieatkinson
Last active August 16, 2024 11:11
Show Gist options
  • Save ollieatkinson/65dd0091cc375118bfed81e6b42e318e to your computer and use it in GitHub Desktop.
Save ollieatkinson/65dd0091cc375118bfed81e6b42e318e to your computer and use it in GitHub Desktop.
A store type with support for sharing, caching, invalidating and streaming `Result` values from `source`
extension Result where Success: Sendable, Failure == Error {
actor SharedStore {
typealias ID = UInt
private let source: AsyncStream<Result>
private let cancellingGracePeriod: Duration
private let clock: ContinuousClock
private var _task: Task<Void, Never>?
private var _value: Result?
private var continuations: [ID: AsyncStream<Result>.Continuation] = [:]
private var count: ID = 0
private var referenceCount: ID = 0
private var cancelTimestamp: ContinuousClock.Instant?
init(
source: AsyncStream<Result>,
cancellingGracePeriod: Duration = .seconds(60),
clock: ContinuousClock = .continuous
) {
self.source = source
self.cancellingGracePeriod = cancellingGracePeriod
self.clock = clock
}
func get() async -> Result {
if let value = _value {
return value
} else {
return await streamValueAndReturnFirst()
}
}
func invalidate() {
_task = nil
_value = nil
if !continuations.isEmpty {
Task { await streamValueAndReturnFirst() }
}
}
}
}
extension Result.SharedStore {
typealias BufferingPolicy = AsyncStream<Result>.Continuation.BufferingPolicy
func stream(bufferingPolicy: BufferingPolicy = .bufferingNewest(1)) -> AsyncStream<Result> {
AsyncStream(bufferingPolicy: bufferingPolicy) { continuation in
insert(continuation)
}
}
@discardableResult
private func streamValueAndReturnFirst() async -> Result {
var _continuation: CheckedContinuation<Result, Never>!
async let value = withCheckedContinuation { _continuation = $0 }
_task = Task<Void, Never> {
for await value in source {
let value = yield(value)
_value = value
if let continuation = _continuation {
continuation.resume(returning: value)
_continuation = nil
}
}
}
return await value
}
private func insert(_ continuation: AsyncStream<Result>.Continuation) {
var _task: Task<Void, Never>?
if let value = _value {
continuation.yield(value)
} else {
_task = Task { await streamValueAndReturnFirst() }
}
count(change: +1)
let id = count + 1
count = id
continuations[id] = continuation
continuation.onTermination = { @Sendable [weak self, _task] _ in
_task?.cancel()
Task { [weak self] in
await self?.remove(continuation: id)
await self?.count(change: -1)
}
}
}
private func yield(_ result: Result) -> Result {
for (_, continuation) in continuations {
continuation.yield(result)
}
return result
}
private func count(change: Int) {
let result = Int(referenceCount) + change
if result > 0 {
referenceCount = UInt(result)
cancelTimestamp = nil
} else if cancellingGracePeriod > .zero {
cancelTimestamp = clock.now
Task { [weak self, clock, t = cancelTimestamp, sleep = cancellingGracePeriod] in
try await Task.sleep(for: sleep, clock: clock)
guard await self?.cancelTimestamp == t else { return }
await self?.cancel()
}
} else {
cancel()
}
}
private func cancel() {
referenceCount = 0
invalidate()
}
private func remove(continuation id: ID) {
continuations.removeValue(forKey: id)
}
enum Error: Swift.Error {
case failedToPopulateCache
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment