Last active
March 10, 2022 10:43
-
-
Save a-voronov/5cf22f4da4ac4fb2b50dd3ee56bf98b5 to your computer and use it in GitHub Desktop.
Async Task + Serial Queue based on ReactiveSwift ✨
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ReactiveSwift | |
import Result | |
// MARK: - Task | |
final class Task<V, E: Error> { | |
typealias ProcessingHandler = (@escaping (Result<V, E>) -> Void, DisposableBag) -> Void | |
enum State { | |
case idle | |
case processing | |
case completed(V) | |
case failed(E) | |
case cancelled | |
} | |
private let processingHandler: ProcessingHandler | |
private let disposable: CompositeDisposable | |
let state: Property<State> | |
private let _state: MutableProperty<State> | |
init(_ processingHandler: @escaping ProcessingHandler) { | |
_state = MutableProperty(.idle) | |
state = Property(_state.skipRepeats { prev, next in | |
// Because of using `MutableProperty.modify` method when trying to change state, | |
// we'll be receiving current state updates even if we ain't chnaging it. | |
// Thus ignore any updates if they match none of possible transition. | |
!Task.canChangeState(from: prev, to: next) | |
}) | |
disposable = CompositeDisposable() | |
self.processingHandler = processingHandler | |
} | |
@discardableResult | |
private func tryChangeState(to newState: State) -> Bool { | |
return _state.modify { state in | |
if Task.canChangeState(from: state, to: newState) { | |
state = newState | |
return true | |
} | |
return false | |
} | |
} | |
private static func canChangeState(from: State, to: State) -> Bool { | |
switch (from, to) { | |
case (.idle, .processing), | |
(.idle, .cancelled), | |
(.processing, .completed), | |
(.processing, .failed), | |
(.processing, .cancelled): | |
return true | |
default: | |
return false | |
} | |
} | |
func start() { | |
guard tryChangeState(to: .processing) else { return } | |
processingHandler({ [weak self] result in | |
switch result { | |
case let .success(value): self?.tryChangeState(to: .completed(value)) | |
case let .failure(error): self?.tryChangeState(to: .failed(error)) | |
} | |
}, disposable) | |
} | |
func cancel() { | |
guard tryChangeState(to: .cancelled) else { return } | |
disposable.dispose() | |
} | |
} | |
extension Task { | |
var isIdle: Bool { return state.value.isIdle } | |
var isProcessing: Bool { return state.value.isProcessing } | |
var isCompleted: Bool { return state.value.isCompleted } | |
var isFailed: Bool { return state.value.isFailed } | |
var isCancelled: Bool { return state.value.isCancelled } | |
var isFinished: Bool { return state.value.isFinished } | |
} | |
extension Task.State { | |
var isIdle: Bool { | |
guard case .idle = self else { return false } | |
return true | |
} | |
var isProcessing: Bool { | |
guard case .processing = self else { return false } | |
return true | |
} | |
var isCompleted: Bool { | |
guard case .completed = self else { return false } | |
return true | |
} | |
var isFailed: Bool { | |
guard case .failed = self else { return false } | |
return true | |
} | |
var isCancelled: Bool { | |
guard case .cancelled = self else { return false } | |
return true | |
} | |
var isFinished: Bool { | |
switch self { | |
case .completed, .failed, .cancelled: return true | |
default: return false | |
} | |
} | |
} | |
// MARK: - Task Queue | |
class TasksQueue<V, E: Error> { | |
private var tasksQueue: [Task<V, E>] | |
private let synchronizationQueue: DispatchQueue | |
var tasksCount: Int { | |
return tasksQueue.count | |
} | |
init() { | |
tasksQueue = [] | |
synchronizationQueue = DispatchQueue(label: "tasks.queue") | |
} | |
// task is started immediately after it's added to the queue if queue is empty | |
// otherwise it waits until all previous tasks finish | |
func add(task: Task<V, E>) { | |
synchronizationQueue.async { | |
// we're not interested in adding task already in progress or finished, thus it should be idle | |
guard task.isIdle else { return } | |
// once task is added | |
self.tasksQueue.append(task) | |
// see if it's the only one in queue and start it if it is | |
if self.tasksCount == 1 { | |
self.startFirstTaskInQueue() | |
} | |
} | |
} | |
private func startFirstTaskInQueue() { | |
// try to get first task in queue, or exit | |
guard let task = tasksQueue.first else { return } | |
// in case it's already finished, just remove it and try again with the next one in queue | |
guard !task.isFinished else { | |
self.remove(task: task) | |
return self.startFirstTaskInQueue() | |
} | |
// even if it's already started by someone outside of queue | |
// we'll be able to track its finishing state and switch to the next operation in queue | |
// | |
// subscribe to its state | |
task.state.producer | |
// so that when it finishes (no matter either with success, failure or cancellation) | |
.filter { $0.isFinished } | |
.take(first: 1) | |
.startWithValues { [weak self] state in | |
guard let strongSelf = self else { return } | |
// it will be removed and the next task in queue will be started | |
strongSelf.synchronizationQueue.async { | |
strongSelf.remove(task: task) | |
strongSelf.startFirstTaskInQueue() | |
} | |
} | |
// finally start the task | |
task.start() | |
} | |
private func remove(task: Task<V, E>) { | |
// remove all tasks pointing to the same instance (if someone decides to add task twice) | |
tasksQueue.removeAll(where: { $0 === task }) | |
} | |
func cancelAllTasks() { | |
// cancel all tasks and remove them from the queue | |
synchronizationQueue.async { | |
self.tasksQueue.forEach { $0.cancel() } | |
self.tasksQueue.removeAll() | |
} | |
} | |
} | |
// MARK: - Helpers | |
protocol DisposableBag { | |
func add(_ disposable: Disposable?) -> Disposable? | |
} | |
extension CompositeDisposable: DisposableBag {} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Example
output: