Skip to content

Instantly share code, notes, and snippets.

@Alexander-Ignition
Last active September 9, 2022 15:31
Show Gist options
  • Save Alexander-Ignition/752f0d5f3cb9a5a2db76183e4eb06cda to your computer and use it in GitHub Desktop.
Save Alexander-Ignition/752f0d5f3cb9a5a2db76183e4eb06cda to your computer and use it in GitHub Desktop.
Swift Concurrent TaskQueue
public final actor TaskQueue {
private final class Condition {
private var _continuation: CheckedContinuation<Void, Never>?
func wait() async {
await withCheckedContinuation { continuation in
_continuation = continuation
}
}
func resume() {
_continuation?.resume()
_continuation = nil
}
}
public static var serial: TaskQueue {
TaskQueue(maximumConcurrentTaskCount: 1)
}
public let maximumConcurrentTaskCount: Int
private var _pending: [Condition] = []
private var _running: [Condition] = []
public init(maximumConcurrentTaskCount: Int) {
precondition(maximumConcurrentTaskCount > 0)
self.maximumConcurrentTaskCount = maximumConcurrentTaskCount
}
@discardableResult
nonisolated public func task<T>(
priority: TaskPriority? = nil,
operation: @escaping () async -> T
) -> Task<T, Never> {
let condition = Condition()
let task = Task<T, Never>(priority: priority) {
await _start(condition)
defer {
Task { await _finish(condition) }
}
return await operation()
}
return task
}
private func _start(_ condition: Condition) async {
if _running.count < maximumConcurrentTaskCount {
_running.append(condition)
} else {
_pending.append(condition)
await condition.wait() // <- next.resume()
}
}
private func _finish(_ condition: Condition) {
let index = _running.firstIndex(where: { $0 === condition })!
_running.remove(at: index)
precondition(_running.count < maximumConcurrentTaskCount)
if !_pending.isEmpty {
let next = _pending.removeFirst()
_running.append(next)
next.resume() // -> await condition.wait()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment