Last active
September 9, 2022 15:31
-
-
Save Alexander-Ignition/752f0d5f3cb9a5a2db76183e4eb06cda to your computer and use it in GitHub Desktop.
Swift Concurrent TaskQueue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| public final actor TaskQueue { | |
| private final class Condition { | |
| private var _continuation: CheckedContinuation<Void, Never>? | |
| func wait() async { | |
| await withCheckedContinuation { continuation in | |
| _continuation = continuation | |
| } | |
| } | |
| func resume() { | |
| _continuation?.resume() | |
| _continuation = nil | |
| } | |
| } | |
| public static var serial: TaskQueue { | |
| TaskQueue(maximumConcurrentTaskCount: 1) | |
| } | |
| public let maximumConcurrentTaskCount: Int | |
| private var _pending: [Condition] = [] | |
| private var _running: [Condition] = [] | |
| public init(maximumConcurrentTaskCount: Int) { | |
| precondition(maximumConcurrentTaskCount > 0) | |
| self.maximumConcurrentTaskCount = maximumConcurrentTaskCount | |
| } | |
| @discardableResult | |
| nonisolated public func task<T>( | |
| priority: TaskPriority? = nil, | |
| operation: @escaping () async -> T | |
| ) -> Task<T, Never> { | |
| let condition = Condition() | |
| let task = Task<T, Never>(priority: priority) { | |
| await _start(condition) | |
| defer { | |
| Task { await _finish(condition) } | |
| } | |
| return await operation() | |
| } | |
| return task | |
| } | |
| private func _start(_ condition: Condition) async { | |
| if _running.count < maximumConcurrentTaskCount { | |
| _running.append(condition) | |
| } else { | |
| _pending.append(condition) | |
| await condition.wait() // <- next.resume() | |
| } | |
| } | |
| private func _finish(_ condition: Condition) { | |
| let index = _running.firstIndex(where: { $0 === condition })! | |
| _running.remove(at: index) | |
| precondition(_running.count < maximumConcurrentTaskCount) | |
| if !_pending.isEmpty { | |
| let next = _pending.removeFirst() | |
| _running.append(next) | |
| next.resume() // -> await condition.wait() | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment