Skip to content

Instantly share code, notes, and snippets.

@thomsmed
Last active February 24, 2024 01:23
Show Gist options
  • Save thomsmed/f1fc649296568029a99c79c7aa162b67 to your computer and use it in GitHub Desktop.
Save thomsmed/f1fc649296568029a99c79c7aa162b67 to your computer and use it in GitHub Desktop.
Various variants of a Resource Cache using primitives from Swift Concurrency.
import Foundation
import Atomics
/// A thread/concurrency context safe value using [Swift Atomics](https://github.com/apple/swift-atomics).
private final class AtomicCount: Sendable {
private let protectedValue = ManagedAtomic<Int>(0)
var value: Int {
protectedValue.load(ordering: .acquiring)
}
func increment() {
protectedValue.wrappingIncrement(ordering: .relaxed)
}
func decrement() {
protectedValue.wrappingDecrement(ordering: .relaxed)
}
func reset() {
protectedValue.store(0, ordering: .relaxed)
}
}
/// A thread/concurrency context safe value using [NSLock](https://developer.apple.com/documentation/foundation/nslock)
private final class LockedCount: @unchecked Sendable {
private let lock = NSLock()
private var protectedValue = 0
var value: Int {
lock.withLock { protectedValue }
}
func increment() {
lock.withLock { protectedValue += 1 }
}
func decrement() {
lock.withLock { protectedValue -= 1 }
}
func reset() {
lock.withLock { protectedValue = 0 }
}
}
/// A thread/concurrency context safe storage for Continuations using [NSLock](https://developer.apple.com/documentation/foundation/nslock)
private final class WaitingContinuationsLocker: @unchecked Sendable {
private var protectedWaitingContinuations: [UUID: CheckedContinuation<String?, Never>] = [:]
private let lock = NSLock()
var count: Int {
lock.withLock { protectedWaitingContinuations.count }
}
func set(_ waitingContinuation: CheckedContinuation<String?, Never>, forId id: UUID) {
_ = lock.withLock { protectedWaitingContinuations.updateValue(waitingContinuation, forKey: id) }
}
func popFirst() -> (UUID, CheckedContinuation<String?, Never>)? {
lock.withLock { protectedWaitingContinuations.popFirst() }
}
func remove(forId id: UUID) -> CheckedContinuation<String?, Never>? {
lock.withLock { protectedWaitingContinuations.removeValue(forKey: id) }
}
}
/// A resource cache that spawns an unstructured Task to fetch a resource when the resource is first requested.
/// All subsequent Tasks requesting the resource will wait on the completion of the spawned unstructured resource fetching Task.
/// If all resource requesting Tasks are canceled, the resource cache will also cancel the spawned unstructured resource fetching Task.
final actor TaskSpawningCancelingResourceCache {
private let urlSession: URLSession
private var cachedResource: String?
private var resourceFetchingTask: Task<String?, Never>?
private let waitingTasksCount = AtomicCount() // Alternative 1
// private let waitingTasksCount = LockedCount() // Alternative 2
init(urlSession: URLSession = .shared) {
self.urlSession = urlSession
}
// MARK: - Private
private func fetchResource() async -> String? {
let url = URL(string: "https://pokeapi.co/api/v2/pokemon/charmander")!
let request = URLRequest(url: url)
do {
let (data, _) = try await urlSession.data(for: request)
guard let resource = String(data: data, encoding: .utf8) else {
return nil
}
return resource
} catch {
// Ignoring proper Error handling for simplicity.
return nil
}
}
// MARK: - Public
var resource: String? {
get async {
if let cachedResource {
return cachedResource
}
let task: Task<String?, Never>
if let existingTask = resourceFetchingTask {
task = existingTask
} else {
waitingTasksCount.reset()
task = Task { [weak self] in
guard let self else {
return nil
}
// One could imagine this Task having to fetch and combine multiple resources,
// then it might be desirable to check for cancelation before and after each chunk of async work.
if Task.isCancelled {
return nil
}
let resource = await self.fetchResource()
if Task.isCancelled {
return nil
}
return resource
}
}
resourceFetchingTask = task
waitingTasksCount.increment()
return await withTaskCancellationHandler {
let resource = await task.value
cachedResource = resource
resourceFetchingTask = nil
return resource
} onCancel: {
waitingTasksCount.decrement()
if waitingTasksCount.value > 0 {
return
}
// Cancel the resource fetching Task,
// since there are no longer any other Tasks waiting for the result.
task.cancel()
}
}
}
var freshResource: String? {
get async {
cachedResource = nil
return await resource
}
}
}
/// A resource cache that spawns an unstructured Task to fetch a resource when the resource is first requested.
/// All subsequent Tasks requesting the resource will wait on the completion of the spawned unstructured resource fetching Task.
final actor TaskSpawningResourceCache {
private let urlSession: URLSession
private var cachedResource: String?
private var resourceFetchingTask: Task<String?, Never>?
init(urlSession: URLSession = .shared) {
self.urlSession = urlSession
}
// MARK: - Private
private func fetchResource() async -> String? {
let url = URL(string: "https://pokeapi.co/api/v2/pokemon/charmander")!
let request = URLRequest(url: url)
do {
let (data, _) = try await urlSession.data(for: request)
guard let resource = String(data: data, encoding: .utf8) else {
return nil
}
return resource
} catch {
// Ignoring proper Error handling for simplicity.
return nil
}
}
// MARK: - Public
var resource: String? {
get async {
if let cachedResource {
return cachedResource
}
let task = resourceFetchingTask ?? Task { [weak self] in
guard let self else {
return nil
}
return await self.fetchResource()
}
resourceFetchingTask = task
let resource = await task.value
cachedResource = resource
resourceFetchingTask = nil
return resource
}
}
var freshResource: String? {
get async {
cachedResource = nil
return await resource
}
}
}
/// A resource cache that will make the first resource requesting Task fetch the resource asynchronously while being suspended.
/// All subsequent Tasks requesting the resource will be suspended, and a Continuation will be created and stored for each Task.
/// Continuations and Tasks will then be resumed once the initial Task has finished fetching the resource.
/// If the initial resource requesting Task is canceled,
/// all subsequent Continuations/Tasks will be resumed with a partially or unfinished result (empty resource in our case).
final actor ContinuationCreatingCancelingResourceCache {
private enum CachedResourceState {
case none
case fetching
case value(String?)
}
private let urlSession: URLSession
private let waitingContinuationsLocker = WaitingContinuationsLocker()
private var cachedResourceState: CachedResourceState = .none
init(urlSession: URLSession = .shared) {
self.urlSession = urlSession
}
// MARK: - Private
private func fetchResource() async -> String? {
let url = URL(string: "https://pokeapi.co/api/v2/pokemon/bulbasaur")!
let request = URLRequest(url: url)
do {
let (data, _) = try await urlSession.data(for: request)
guard let resource = String(data: data, encoding: .utf8) else {
return nil
}
return resource
} catch {
// Ignoring proper Error handling for simplicity.
return nil
}
}
// MARK: - Public
var resource: String? {
get async {
switch cachedResourceState {
case .none:
cachedResourceState = .fetching
let resource = await fetchResource()
cachedResourceState = .value(resource)
while let (_, waitingContinuation) = waitingContinuationsLocker.popFirst() {
// Resume suspended Continuations/Tasks by returning the fetched resource.
waitingContinuation.resume(returning: resource)
}
return resource
case .fetching:
let id = UUID()
return await withTaskCancellationHandler {
await withCheckedContinuation { continuation in
waitingContinuationsLocker.set(continuation, forId: id)
}
} onCancel: {
guard let waitingContinuation = waitingContinuationsLocker.remove(forId: id) else {
return
}
// Resource requesting Task was canceled while being suspended.
// Resume the Continuation/Task by returning a partially or unfinished result (empty resource in our case).
waitingContinuation.resume(returning: nil)
}
case let .value(resource):
return resource
}
}
}
var freshResource: String? {
get async {
cachedResourceState = .none
return await resource
}
}
}
/// A resource cache that will make the first resource requesting Task fetch the resource asynchronously while being suspended.
/// All subsequent Tasks requesting the resource will be suspended, and a Continuation will be created and stored for each Task.
/// Continuations and Tasks will then be resumed once the initial Task has finished fetching the resource.
final actor ContinuationCreatingResourceCache {
private enum CachedResourceState {
case none
case fetching
case value(String?)
}
private let urlSession: URLSession
private let waitingContinuationsLocker = WaitingContinuationsLocker()
private var cachedResourceState: CachedResourceState = .none
init(urlSession: URLSession = .shared) {
self.urlSession = urlSession
}
// MARK: - Private
private func fetchResource() async -> String? {
let url = URL(string: "https://pokeapi.co/api/v2/pokemon/bulbasaur")!
let request = URLRequest(url: url)
do {
let (data, _) = try await urlSession.data(for: request)
guard let resource = String(data: data, encoding: .utf8) else {
return nil
}
return resource
} catch {
// Ignoring proper Error handling for simplicity.
return nil
}
}
// MARK: - Public
var resource: String? {
get async {
switch cachedResourceState {
case .none:
cachedResourceState = .fetching
let resource = await fetchResource()
cachedResourceState = .value(resource)
while let (_, waitingContinuation) = waitingContinuationsLocker.popFirst() {
// Resume suspended Continuations/Tasks by returning the fetched resource.
waitingContinuation.resume(returning: resource)
}
return resource
case .fetching:
let id = UUID()
return await withCheckedContinuation { continuation in
waitingContinuationsLocker.set(continuation, forId: id)
}
case let .value(resource):
return resource
}
}
}
var freshResource: String? {
get async {
cachedResourceState = .none
return await resource
}
}
}
/// A resource cache that spawns an unstructured Task to fetch a resource when the resource is first requested.
/// All Tasks requesting the resource will be suspended, and a Continuation will be created and stored for each Task.
/// Continuations and Tasks will then be resumed once the initial Task has finished fetching the resource.
/// If a resource requesting Task is canceled while suspended and waiting for the unstructured resource fetching Task,
/// it will be resumed with a partially or unfinished result (empty resource in our case).
/// If all resource requesting Tasks are canceled before the unstructured resource fetching Task has finished,
/// the resource fetching Task will also be canceled (preventing unnecessary resources from being fetched).
final actor ContinuationCreatingTaskSpawningCancelingResourceCache {
/// A thread/concurrency context safe storage for Continuations, CachedResourceState and resource fetching Task using [NSLock](https://developer.apple.com/documentation/foundation/nslock)
private final class Locker: @unchecked Sendable {
private var protectedWaitingContinuations: [UUID: CheckedContinuation<String?, Never>] = [:]
private var protectedCachedResourceState: CachedResourceState = .none
private var protectedResourceFetchingTask: Task<Void, Never>? = nil
private let lock = NSLock()
var cachedResourceState: CachedResourceState {
get { lock.withLock { protectedCachedResourceState } }
set { lock.withLock { protectedCachedResourceState = newValue } }
}
var resourceFetchingTask: Task<Void, Never>? {
get { lock.withLock { protectedResourceFetchingTask } }
set { lock.withLock { protectedResourceFetchingTask = newValue } }
}
var continuationsCount: Int {
lock.withLock { protectedWaitingContinuations.count }
}
func setContinuation(_ waitingContinuation: CheckedContinuation<String?, Never>, forId id: UUID) {
_ = lock.withLock { protectedWaitingContinuations.updateValue(waitingContinuation, forKey: id) }
}
func popFirstContinuation() -> (UUID, CheckedContinuation<String?, Never>)? {
lock.withLock { protectedWaitingContinuations.popFirst() }
}
func removeContinuation(forId id: UUID) -> CheckedContinuation<String?, Never>? {
lock.withLock { protectedWaitingContinuations.removeValue(forKey: id) }
}
}
private enum CachedResourceState {
case none
case fetching
case value(String?)
}
private let urlSession: URLSession
private let locker = Locker()
init(urlSession: URLSession = .shared) {
self.urlSession = urlSession
}
// MARK: - Private
private func fetchResource() async -> String? {
let url = URL(string: "https://pokeapi.co/api/v2/pokemon/bulbasaur")!
let request = URLRequest(url: url)
do {
let (data, _) = try await urlSession.data(for: request)
guard let resource = String(data: data, encoding: .utf8) else {
return nil
}
return resource
} catch {
// Ignoring proper Error handling for simplicity.
return nil
}
}
// MARK: - Public
var resource: String? {
get async {
switch locker.cachedResourceState {
case .none:
locker.cachedResourceState = .fetching
locker.resourceFetchingTask = Task { [weak self] in
guard let self else {
return
}
var resource: String? = nil
defer {
while let (_, waitingContinuation) = self.locker.popFirstContinuation() {
// Resume suspended Continuations/Tasks by returning the fetched resource.
// If the resource requesting Task was canceled while being suspended (this Task),
// resume all waiting Continuations/Tasks by returning a partially or unfinished result (empty resource in our case).
waitingContinuation.resume(returning: resource)
}
}
// One could imagine this Task having to fetch and combine multiple resources,
// then it might be desirable to check for cancelation before and after each chunk of async work.
if Task.isCancelled {
return
}
resource = await self.fetchResource()
if Task.isCancelled {
return
}
self.locker.cachedResourceState = .value(resource)
}
let id = UUID()
return await withTaskCancellationHandler {
await withCheckedContinuation { continuation in
locker.setContinuation(continuation, forId: id)
}
} onCancel: {
guard let waitingContinuation = locker.removeContinuation(forId: id) else {
return
}
// Resource requesting Task was canceled while being suspended.
// Resume the Continuation/Task by returning a partially or unfinished result (empty resource in our case).
waitingContinuation.resume(returning: nil)
if locker.continuationsCount > 0 {
return
}
locker.resourceFetchingTask?.cancel()
}
case .fetching:
let id = UUID()
return await withTaskCancellationHandler {
await withCheckedContinuation { continuation in
locker.setContinuation(continuation, forId: id)
}
} onCancel: {
guard let waitingContinuation = locker.removeContinuation(forId: id) else {
return
}
// Resource requesting Task was canceled while being suspended.
// Resume the Continuation/Task by returning a partially or unfinished result (empty resource in our case).
waitingContinuation.resume(returning: nil)
if locker.continuationsCount > 0 {
return
}
locker.resourceFetchingTask?.cancel()
}
case let .value(resource):
return resource
}
}
}
var freshResource: String? {
get async {
locker.cachedResourceState = .none
return await resource
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment