Skip to content

Instantly share code, notes, and snippets.

@a-voronov
Last active March 10, 2022 10:43
Show Gist options
  • Save a-voronov/5cf22f4da4ac4fb2b50dd3ee56bf98b5 to your computer and use it in GitHub Desktop.
Save a-voronov/5cf22f4da4ac4fb2b50dd3ee56bf98b5 to your computer and use it in GitHub Desktop.
Async Task + Serial Queue based on ReactiveSwift ✨
import ReactiveSwift
import Result
// MARK: - Task
final class Task<V, E: Error> {
typealias ProcessingHandler = (@escaping (Result<V, E>) -> Void, DisposableBag) -> Void
enum State {
case idle
case processing
case completed(V)
case failed(E)
case cancelled
}
private let processingHandler: ProcessingHandler
private let disposable: CompositeDisposable
let state: Property<State>
private let _state: MutableProperty<State>
init(_ processingHandler: @escaping ProcessingHandler) {
_state = MutableProperty(.idle)
state = Property(_state.skipRepeats { prev, next in
// Because of using `MutableProperty.modify` method when trying to change state,
// we'll be receiving current state updates even if we ain't chnaging it.
// Thus ignore any updates if they match none of possible transition.
!Task.canChangeState(from: prev, to: next)
})
disposable = CompositeDisposable()
self.processingHandler = processingHandler
}
@discardableResult
private func tryChangeState(to newState: State) -> Bool {
return _state.modify { state in
if Task.canChangeState(from: state, to: newState) {
state = newState
return true
}
return false
}
}
private static func canChangeState(from: State, to: State) -> Bool {
switch (from, to) {
case (.idle, .processing),
(.idle, .cancelled),
(.processing, .completed),
(.processing, .failed),
(.processing, .cancelled):
return true
default:
return false
}
}
func start() {
guard tryChangeState(to: .processing) else { return }
processingHandler({ [weak self] result in
switch result {
case let .success(value): self?.tryChangeState(to: .completed(value))
case let .failure(error): self?.tryChangeState(to: .failed(error))
}
}, disposable)
}
func cancel() {
guard tryChangeState(to: .cancelled) else { return }
disposable.dispose()
}
}
extension Task {
var isIdle: Bool { return state.value.isIdle }
var isProcessing: Bool { return state.value.isProcessing }
var isCompleted: Bool { return state.value.isCompleted }
var isFailed: Bool { return state.value.isFailed }
var isCancelled: Bool { return state.value.isCancelled }
var isFinished: Bool { return state.value.isFinished }
}
extension Task.State {
var isIdle: Bool {
guard case .idle = self else { return false }
return true
}
var isProcessing: Bool {
guard case .processing = self else { return false }
return true
}
var isCompleted: Bool {
guard case .completed = self else { return false }
return true
}
var isFailed: Bool {
guard case .failed = self else { return false }
return true
}
var isCancelled: Bool {
guard case .cancelled = self else { return false }
return true
}
var isFinished: Bool {
switch self {
case .completed, .failed, .cancelled: return true
default: return false
}
}
}
// MARK: - Task Queue
class TasksQueue<V, E: Error> {
private var tasksQueue: [Task<V, E>]
private let synchronizationQueue: DispatchQueue
var tasksCount: Int {
return tasksQueue.count
}
init() {
tasksQueue = []
synchronizationQueue = DispatchQueue(label: "tasks.queue")
}
// task is started immediately after it's added to the queue if queue is empty
// otherwise it waits until all previous tasks finish
func add(task: Task<V, E>) {
synchronizationQueue.async {
// we're not interested in adding task already in progress or finished, thus it should be idle
guard task.isIdle else { return }
// once task is added
self.tasksQueue.append(task)
// see if it's the only one in queue and start it if it is
if self.tasksCount == 1 {
self.startFirstTaskInQueue()
}
}
}
private func startFirstTaskInQueue() {
// try to get first task in queue, or exit
guard let task = tasksQueue.first else { return }
// in case it's already finished, just remove it and try again with the next one in queue
guard !task.isFinished else {
self.remove(task: task)
return self.startFirstTaskInQueue()
}
// even if it's already started by someone outside of queue
// we'll be able to track its finishing state and switch to the next operation in queue
//
// subscribe to its state
task.state.producer
// so that when it finishes (no matter either with success, failure or cancellation)
.filter { $0.isFinished }
.take(first: 1)
.startWithValues { [weak self] state in
guard let strongSelf = self else { return }
// it will be removed and the next task in queue will be started
strongSelf.synchronizationQueue.async {
strongSelf.remove(task: task)
strongSelf.startFirstTaskInQueue()
}
}
// finally start the task
task.start()
}
private func remove(task: Task<V, E>) {
// remove all tasks pointing to the same instance (if someone decides to add task twice)
tasksQueue.removeAll(where: { $0 === task })
}
func cancelAllTasks() {
// cancel all tasks and remove them from the queue
synchronizationQueue.async {
self.tasksQueue.forEach { $0.cancel() }
self.tasksQueue.removeAll()
}
}
}
// MARK: - Helpers
protocol DisposableBag {
func add(_ disposable: Disposable?) -> Disposable?
}
extension CompositeDisposable: DisposableBag {}
@a-voronov
Copy link
Author

Example

extension String: Error {}

let t1 = Task<Int, String> { (completion, _) in
    DispatchQueue.global().asyncAfter(deadline: .now() + .seconds(2)) {
        completion(.success(1))
    }
}
let t2 = Task<Int, String> { (completion, _) in
    DispatchQueue.global().asyncAfter(deadline: .now() + .seconds(3)) {
        completion(.failure("☹️"))
    }
}
let t3 = Task<Int, String> { (completion, _) in
    DispatchQueue.global().asyncAfter(deadline: .now() + .seconds(1)) {
        completion(.success(3))
    }
}

let queue = TasksQueue<Int, String>()

t1.state.producer.startWithValues { state in print("#1: \(state)") }
t2.state.producer.startWithValues { state in print("#2: \(state)") }
t3.state.producer.startWithValues { state in print("#3: \(state)") }

queue.add(task: t1)
queue.add(task: t2)
queue.add(task: t3)

DispatchQueue.global().asyncAfter(deadline: .now() + .seconds(4)) {
    t3.cancel()
}

DispatchQueue.global().asyncAfter(deadline: .now() + .seconds(7)) {
    print("total: \(queue.tasksCount)")
}

output:

#1: idle
#2: idle
#3: idle
#1: processing
#1: completed(1)
#2: processing
#3: cancelled
#2: failed("☹️")
total: 0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment