Created
May 10, 2018 18:26
-
-
Save Mathieu-Gosbee-ConnectedLab/c03f28223280fdd84422447aee5e0a5a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Foundation | |
import RxSwift | |
public enum SerialQueueState { | |
case active | |
case idle | |
} | |
class SerialQueueSubject<T>: ObservableType where T: ObservableType { | |
public var state: Observable<SerialQueueState> { | |
return Observable.combineLatest(self.active, self.list) | |
.map { $0 == nil && $1.isEmpty ? .idle : .active } | |
.distinctUntilChanged() | |
} | |
func subscribe<O>(_ observer: O) -> Disposable where O: ObserverType, E == O.E { | |
let setNextDisposable = Observable | |
.combineLatest(active, list.asObservable()) | |
.observeOn(MainScheduler.asyncInstance) | |
.subscribe(onNext: { [unowned active, unowned list] activeValue, listValue in | |
var listValue = listValue | |
if activeValue == nil && listValue.count > 0 { | |
let item = listValue.removeFirst() | |
active.onNext(item) | |
list.onNext(listValue) | |
} | |
}) | |
let executeActiveDisposable = active | |
.filter { $0 != nil } | |
.map { $0! } | |
.flatMapLatest { $0 } | |
.observeOn(MainScheduler.asyncInstance) | |
.next { [unowned self] itemValue in | |
observer.onNext(itemValue) | |
self.active.onNext(nil) | |
} | |
return CompositeDisposable(setNextDisposable, executeActiveDisposable) | |
} | |
typealias E = T.E // swiftlint:disable:this type_name | |
private var list: BehaviorSubject<[T]> = BehaviorSubject(value: []) | |
private var active: ReplaySubject<T?> = ReplaySubject.create(bufferSize: 1) | |
public init(_ value: [T] = []) { | |
list.onNext(value) | |
active.onNext(nil) | |
} | |
func push(_ object: T) { | |
let currentList = try? list.value() + [object] | |
guard currentList != nil else { | |
return | |
} | |
self.list.onNext(currentList!) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment