Last active
February 24, 2024 01:23
-
-
Save thomsmed/f1fc649296568029a99c79c7aa162b67 to your computer and use it in GitHub Desktop.
Various variants of a Resource Cache using primitives from Swift Concurrency.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Foundation | |
import Atomics | |
/// A thread/concurrency context safe value using [Swift Atomics](https://github.com/apple/swift-atomics). | |
private final class AtomicCount: Sendable { | |
private let protectedValue = ManagedAtomic<Int>(0) | |
var value: Int { | |
protectedValue.load(ordering: .acquiring) | |
} | |
func increment() { | |
protectedValue.wrappingIncrement(ordering: .relaxed) | |
} | |
func decrement() { | |
protectedValue.wrappingDecrement(ordering: .relaxed) | |
} | |
func reset() { | |
protectedValue.store(0, ordering: .relaxed) | |
} | |
} | |
/// A thread/concurrency context safe value using [NSLock](https://developer.apple.com/documentation/foundation/nslock) | |
private final class LockedCount: @unchecked Sendable { | |
private let lock = NSLock() | |
private var protectedValue = 0 | |
var value: Int { | |
lock.withLock { protectedValue } | |
} | |
func increment() { | |
lock.withLock { protectedValue += 1 } | |
} | |
func decrement() { | |
lock.withLock { protectedValue -= 1 } | |
} | |
func reset() { | |
lock.withLock { protectedValue = 0 } | |
} | |
} | |
/// A thread/concurrency context safe storage for Continuations using [NSLock](https://developer.apple.com/documentation/foundation/nslock) | |
private final class WaitingContinuationsLocker: @unchecked Sendable { | |
private var protectedWaitingContinuations: [UUID: CheckedContinuation<String?, Never>] = [:] | |
private let lock = NSLock() | |
var count: Int { | |
lock.withLock { protectedWaitingContinuations.count } | |
} | |
func set(_ waitingContinuation: CheckedContinuation<String?, Never>, forId id: UUID) { | |
_ = lock.withLock { protectedWaitingContinuations.updateValue(waitingContinuation, forKey: id) } | |
} | |
func popFirst() -> (UUID, CheckedContinuation<String?, Never>)? { | |
lock.withLock { protectedWaitingContinuations.popFirst() } | |
} | |
func remove(forId id: UUID) -> CheckedContinuation<String?, Never>? { | |
lock.withLock { protectedWaitingContinuations.removeValue(forKey: id) } | |
} | |
} | |
/// A resource cache that spawns an unstructured Task to fetch a resource when the resource is first requested. | |
/// All subsequent Tasks requesting the resource will wait on the completion of the spawned unstructured resource fetching Task. | |
/// If all resource requesting Tasks are canceled, the resource cache will also cancel the spawned unstructured resource fetching Task. | |
final actor TaskSpawningCancelingResourceCache { | |
private let urlSession: URLSession | |
private var cachedResource: String? | |
private var resourceFetchingTask: Task<String?, Never>? | |
private let waitingTasksCount = AtomicCount() // Alternative 1 | |
// private let waitingTasksCount = LockedCount() // Alternative 2 | |
init(urlSession: URLSession = .shared) { | |
self.urlSession = urlSession | |
} | |
// MARK: - Private | |
private func fetchResource() async -> String? { | |
let url = URL(string: "https://pokeapi.co/api/v2/pokemon/charmander")! | |
let request = URLRequest(url: url) | |
do { | |
let (data, _) = try await urlSession.data(for: request) | |
guard let resource = String(data: data, encoding: .utf8) else { | |
return nil | |
} | |
return resource | |
} catch { | |
// Ignoring proper Error handling for simplicity. | |
return nil | |
} | |
} | |
// MARK: - Public | |
var resource: String? { | |
get async { | |
if let cachedResource { | |
return cachedResource | |
} | |
let task: Task<String?, Never> | |
if let existingTask = resourceFetchingTask { | |
task = existingTask | |
} else { | |
waitingTasksCount.reset() | |
task = Task { [weak self] in | |
guard let self else { | |
return nil | |
} | |
// One could imagine this Task having to fetch and combine multiple resources, | |
// then it might be desirable to check for cancelation before and after each chunk of async work. | |
if Task.isCancelled { | |
return nil | |
} | |
let resource = await self.fetchResource() | |
if Task.isCancelled { | |
return nil | |
} | |
return resource | |
} | |
} | |
resourceFetchingTask = task | |
waitingTasksCount.increment() | |
return await withTaskCancellationHandler { | |
let resource = await task.value | |
cachedResource = resource | |
resourceFetchingTask = nil | |
return resource | |
} onCancel: { | |
waitingTasksCount.decrement() | |
if waitingTasksCount.value > 0 { | |
return | |
} | |
// Cancel the resource fetching Task, | |
// since there are no longer any other Tasks waiting for the result. | |
task.cancel() | |
} | |
} | |
} | |
var freshResource: String? { | |
get async { | |
cachedResource = nil | |
return await resource | |
} | |
} | |
} | |
/// A resource cache that spawns an unstructured Task to fetch a resource when the resource is first requested. | |
/// All subsequent Tasks requesting the resource will wait on the completion of the spawned unstructured resource fetching Task. | |
final actor TaskSpawningResourceCache { | |
private let urlSession: URLSession | |
private var cachedResource: String? | |
private var resourceFetchingTask: Task<String?, Never>? | |
init(urlSession: URLSession = .shared) { | |
self.urlSession = urlSession | |
} | |
// MARK: - Private | |
private func fetchResource() async -> String? { | |
let url = URL(string: "https://pokeapi.co/api/v2/pokemon/charmander")! | |
let request = URLRequest(url: url) | |
do { | |
let (data, _) = try await urlSession.data(for: request) | |
guard let resource = String(data: data, encoding: .utf8) else { | |
return nil | |
} | |
return resource | |
} catch { | |
// Ignoring proper Error handling for simplicity. | |
return nil | |
} | |
} | |
// MARK: - Public | |
var resource: String? { | |
get async { | |
if let cachedResource { | |
return cachedResource | |
} | |
let task = resourceFetchingTask ?? Task { [weak self] in | |
guard let self else { | |
return nil | |
} | |
return await self.fetchResource() | |
} | |
resourceFetchingTask = task | |
let resource = await task.value | |
cachedResource = resource | |
resourceFetchingTask = nil | |
return resource | |
} | |
} | |
var freshResource: String? { | |
get async { | |
cachedResource = nil | |
return await resource | |
} | |
} | |
} | |
/// A resource cache that will make the first resource requesting Task fetch the resource asynchronously while being suspended. | |
/// All subsequent Tasks requesting the resource will be suspended, and a Continuation will be created and stored for each Task. | |
/// Continuations and Tasks will then be resumed once the initial Task has finished fetching the resource. | |
/// If the initial resource requesting Task is canceled, | |
/// all subsequent Continuations/Tasks will be resumed with a partially or unfinished result (empty resource in our case). | |
final actor ContinuationCreatingCancelingResourceCache { | |
private enum CachedResourceState { | |
case none | |
case fetching | |
case value(String?) | |
} | |
private let urlSession: URLSession | |
private let waitingContinuationsLocker = WaitingContinuationsLocker() | |
private var cachedResourceState: CachedResourceState = .none | |
init(urlSession: URLSession = .shared) { | |
self.urlSession = urlSession | |
} | |
// MARK: - Private | |
private func fetchResource() async -> String? { | |
let url = URL(string: "https://pokeapi.co/api/v2/pokemon/bulbasaur")! | |
let request = URLRequest(url: url) | |
do { | |
let (data, _) = try await urlSession.data(for: request) | |
guard let resource = String(data: data, encoding: .utf8) else { | |
return nil | |
} | |
return resource | |
} catch { | |
// Ignoring proper Error handling for simplicity. | |
return nil | |
} | |
} | |
// MARK: - Public | |
var resource: String? { | |
get async { | |
switch cachedResourceState { | |
case .none: | |
cachedResourceState = .fetching | |
let resource = await fetchResource() | |
cachedResourceState = .value(resource) | |
while let (_, waitingContinuation) = waitingContinuationsLocker.popFirst() { | |
// Resume suspended Continuations/Tasks by returning the fetched resource. | |
waitingContinuation.resume(returning: resource) | |
} | |
return resource | |
case .fetching: | |
let id = UUID() | |
return await withTaskCancellationHandler { | |
await withCheckedContinuation { continuation in | |
waitingContinuationsLocker.set(continuation, forId: id) | |
} | |
} onCancel: { | |
guard let waitingContinuation = waitingContinuationsLocker.remove(forId: id) else { | |
return | |
} | |
// Resource requesting Task was canceled while being suspended. | |
// Resume the Continuation/Task by returning a partially or unfinished result (empty resource in our case). | |
waitingContinuation.resume(returning: nil) | |
} | |
case let .value(resource): | |
return resource | |
} | |
} | |
} | |
var freshResource: String? { | |
get async { | |
cachedResourceState = .none | |
return await resource | |
} | |
} | |
} | |
/// A resource cache that will make the first resource requesting Task fetch the resource asynchronously while being suspended. | |
/// All subsequent Tasks requesting the resource will be suspended, and a Continuation will be created and stored for each Task. | |
/// Continuations and Tasks will then be resumed once the initial Task has finished fetching the resource. | |
final actor ContinuationCreatingResourceCache { | |
private enum CachedResourceState { | |
case none | |
case fetching | |
case value(String?) | |
} | |
private let urlSession: URLSession | |
private let waitingContinuationsLocker = WaitingContinuationsLocker() | |
private var cachedResourceState: CachedResourceState = .none | |
init(urlSession: URLSession = .shared) { | |
self.urlSession = urlSession | |
} | |
// MARK: - Private | |
private func fetchResource() async -> String? { | |
let url = URL(string: "https://pokeapi.co/api/v2/pokemon/bulbasaur")! | |
let request = URLRequest(url: url) | |
do { | |
let (data, _) = try await urlSession.data(for: request) | |
guard let resource = String(data: data, encoding: .utf8) else { | |
return nil | |
} | |
return resource | |
} catch { | |
// Ignoring proper Error handling for simplicity. | |
return nil | |
} | |
} | |
// MARK: - Public | |
var resource: String? { | |
get async { | |
switch cachedResourceState { | |
case .none: | |
cachedResourceState = .fetching | |
let resource = await fetchResource() | |
cachedResourceState = .value(resource) | |
while let (_, waitingContinuation) = waitingContinuationsLocker.popFirst() { | |
// Resume suspended Continuations/Tasks by returning the fetched resource. | |
waitingContinuation.resume(returning: resource) | |
} | |
return resource | |
case .fetching: | |
let id = UUID() | |
return await withCheckedContinuation { continuation in | |
waitingContinuationsLocker.set(continuation, forId: id) | |
} | |
case let .value(resource): | |
return resource | |
} | |
} | |
} | |
var freshResource: String? { | |
get async { | |
cachedResourceState = .none | |
return await resource | |
} | |
} | |
} | |
/// A resource cache that spawns an unstructured Task to fetch a resource when the resource is first requested. | |
/// All Tasks requesting the resource will be suspended, and a Continuation will be created and stored for each Task. | |
/// Continuations and Tasks will then be resumed once the initial Task has finished fetching the resource. | |
/// If a resource requesting Task is canceled while suspended and waiting for the unstructured resource fetching Task, | |
/// it will be resumed with a partially or unfinished result (empty resource in our case). | |
/// If all resource requesting Tasks are canceled before the unstructured resource fetching Task has finished, | |
/// the resource fetching Task will also be canceled (preventing unnecessary resources from being fetched). | |
final actor ContinuationCreatingTaskSpawningCancelingResourceCache { | |
/// A thread/concurrency context safe storage for Continuations, CachedResourceState and resource fetching Task using [NSLock](https://developer.apple.com/documentation/foundation/nslock) | |
private final class Locker: @unchecked Sendable { | |
private var protectedWaitingContinuations: [UUID: CheckedContinuation<String?, Never>] = [:] | |
private var protectedCachedResourceState: CachedResourceState = .none | |
private var protectedResourceFetchingTask: Task<Void, Never>? = nil | |
private let lock = NSLock() | |
var cachedResourceState: CachedResourceState { | |
get { lock.withLock { protectedCachedResourceState } } | |
set { lock.withLock { protectedCachedResourceState = newValue } } | |
} | |
var resourceFetchingTask: Task<Void, Never>? { | |
get { lock.withLock { protectedResourceFetchingTask } } | |
set { lock.withLock { protectedResourceFetchingTask = newValue } } | |
} | |
var continuationsCount: Int { | |
lock.withLock { protectedWaitingContinuations.count } | |
} | |
func setContinuation(_ waitingContinuation: CheckedContinuation<String?, Never>, forId id: UUID) { | |
_ = lock.withLock { protectedWaitingContinuations.updateValue(waitingContinuation, forKey: id) } | |
} | |
func popFirstContinuation() -> (UUID, CheckedContinuation<String?, Never>)? { | |
lock.withLock { protectedWaitingContinuations.popFirst() } | |
} | |
func removeContinuation(forId id: UUID) -> CheckedContinuation<String?, Never>? { | |
lock.withLock { protectedWaitingContinuations.removeValue(forKey: id) } | |
} | |
} | |
private enum CachedResourceState { | |
case none | |
case fetching | |
case value(String?) | |
} | |
private let urlSession: URLSession | |
private let locker = Locker() | |
init(urlSession: URLSession = .shared) { | |
self.urlSession = urlSession | |
} | |
// MARK: - Private | |
private func fetchResource() async -> String? { | |
let url = URL(string: "https://pokeapi.co/api/v2/pokemon/bulbasaur")! | |
let request = URLRequest(url: url) | |
do { | |
let (data, _) = try await urlSession.data(for: request) | |
guard let resource = String(data: data, encoding: .utf8) else { | |
return nil | |
} | |
return resource | |
} catch { | |
// Ignoring proper Error handling for simplicity. | |
return nil | |
} | |
} | |
// MARK: - Public | |
var resource: String? { | |
get async { | |
switch locker.cachedResourceState { | |
case .none: | |
locker.cachedResourceState = .fetching | |
locker.resourceFetchingTask = Task { [weak self] in | |
guard let self else { | |
return | |
} | |
var resource: String? = nil | |
defer { | |
while let (_, waitingContinuation) = self.locker.popFirstContinuation() { | |
// Resume suspended Continuations/Tasks by returning the fetched resource. | |
// If the resource requesting Task was canceled while being suspended (this Task), | |
// resume all waiting Continuations/Tasks by returning a partially or unfinished result (empty resource in our case). | |
waitingContinuation.resume(returning: resource) | |
} | |
} | |
// One could imagine this Task having to fetch and combine multiple resources, | |
// then it might be desirable to check for cancelation before and after each chunk of async work. | |
if Task.isCancelled { | |
return | |
} | |
resource = await self.fetchResource() | |
if Task.isCancelled { | |
return | |
} | |
self.locker.cachedResourceState = .value(resource) | |
} | |
let id = UUID() | |
return await withTaskCancellationHandler { | |
await withCheckedContinuation { continuation in | |
locker.setContinuation(continuation, forId: id) | |
} | |
} onCancel: { | |
guard let waitingContinuation = locker.removeContinuation(forId: id) else { | |
return | |
} | |
// Resource requesting Task was canceled while being suspended. | |
// Resume the Continuation/Task by returning a partially or unfinished result (empty resource in our case). | |
waitingContinuation.resume(returning: nil) | |
if locker.continuationsCount > 0 { | |
return | |
} | |
locker.resourceFetchingTask?.cancel() | |
} | |
case .fetching: | |
let id = UUID() | |
return await withTaskCancellationHandler { | |
await withCheckedContinuation { continuation in | |
locker.setContinuation(continuation, forId: id) | |
} | |
} onCancel: { | |
guard let waitingContinuation = locker.removeContinuation(forId: id) else { | |
return | |
} | |
// Resource requesting Task was canceled while being suspended. | |
// Resume the Continuation/Task by returning a partially or unfinished result (empty resource in our case). | |
waitingContinuation.resume(returning: nil) | |
if locker.continuationsCount > 0 { | |
return | |
} | |
locker.resourceFetchingTask?.cancel() | |
} | |
case let .value(resource): | |
return resource | |
} | |
} | |
} | |
var freshResource: String? { | |
get async { | |
locker.cachedResourceState = .none | |
return await resource | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment