Last active
November 14, 2020 01:34
-
-
Save cmc5788/5ed38645dca67a04bd2baa80ef4e26ce to your computer and use it in GitHub Desktop.
General RxSwift utility stuffs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import RxSwift | |
extension UIButton { | |
public func throttledTapObservable(_ throttle : RxTimeInterval = 1.0) -> RxSwift.Observable<Swift.Void> { | |
return self.rx.tap.asObservable() | |
.throttle(throttle, latest: false, scheduler: MainScheduler.instance) | |
} | |
} | |
/** Hacky way to turn the retryWhen operator into the (currently missing) repeatWhen operator. */ | |
fileprivate class CoerceToRepeat : Error { | |
} | |
extension UIViewController { | |
public func printLifecycle(_ name : String) { | |
let willAppear = self.rx.methodInvoked(#selector(self.viewWillAppear(_:))) | |
.do(onNext:{ _ in print("\(name) viewWillAppear") }) | |
let didAppear = self.rx.methodInvoked(#selector(self.viewDidAppear(_:))) | |
.do(onNext:{ _ in print("\(name) viewDidAppear") }) | |
let willDisappear = self.rx.methodInvoked(#selector(self.viewWillDisappear(_:))) | |
.do(onNext:{ _ in print("\(name) viewWillDisappear") }) | |
let didDisappear = self.rx.methodInvoked(#selector(self.viewDidDisappear(_:))) | |
.do(onNext:{ _ in print("\(name) viewDidDisappear") }) | |
_ = Observable.merge([willAppear, didAppear, willDisappear, didDisappear]) | |
.takeUntil(self.rx.deallocated) | |
.subscribe() | |
} | |
} | |
extension ObservableType { | |
public func filterFlatMap(_ filter: @escaping (Self.E) -> RxSwift.Observable<Bool>) -> RxSwift.Observable<Self.E> { | |
return self.flatMap { a -> Observable<Self.E> in | |
filter(a).take(1).flatMap { b -> Observable<Self.E> in | |
b ? Observable.just(a) : Observable.empty() | |
} | |
} | |
} | |
public func mapToVoid() -> RxSwift.Observable<Swift.Void> { | |
return self.map { _ in } | |
} | |
public func mapToString() -> RxSwift.Observable<String> { | |
return self.map { "\($0)" } | |
} | |
public func shareReplayLatestWhileConnectedWithLinger(_ delayTime : RxTimeInterval, scheduler : SchedulerType = Schedulers.main) -> RxSwift.Observable<Self.E> { | |
let sharedSource = self.concat(Observable.empty() | |
.delaySubscription(delayTime, scheduler: scheduler)) | |
.shareReplayLatestWhileConnected() | |
return Observable<Self.E>.create { observer in | |
let sub = sharedSource.subscribe(onNext: { | |
observer.on(.next($0)) | |
}, onError: { | |
observer.on(.error($0)) | |
}, onCompleted: { | |
observer.on(.completed) | |
}) | |
return Disposables.create { | |
_ = scheduler.scheduleRelative((), dueTime: delayTime) { _ in | |
sub.dispose() | |
return Disposables.create() | |
} | |
} | |
} | |
} | |
public func ignoreErrors() -> RxSwift.Observable<Self.E> { | |
return self.catchError { _ in Observable.empty() } | |
} | |
public func multiplex(_ observer : AnyObserver<Self.E>) -> RxSwift.Observable<Self.E> { | |
return self.do(onNext:{ observer.onNext($0) }) | |
} | |
public func repeatWhen<TriggerObservable : ObservableType>(_ notificationHandler: @escaping (RxSwift.Observable<Swift.Void>) -> TriggerObservable) -> RxSwift.Observable<Self.E> { | |
return self.concat(Observable<Self.E>.error(CoerceToRepeat())) | |
.retryWhen { (errObs : Observable<Swift.Error>) -> TriggerObservable in | |
return notificationHandler(errObs.flatMap { err -> Observable<Swift.Void> in | |
err is CoerceToRepeat ? Observable.just() : Observable.error(err) | |
}) | |
} | |
} | |
public func `repeat`(_ n : Int) -> RxSwift.Observable<Self.E> { | |
return self.repeatWhen { $0.flatMapWithIndex { _, i in i + 1 < n ? Observable.just() : Observable.empty() } } | |
} | |
public func `repeat`() -> RxSwift.Observable<Self.E> { | |
return self.repeatWhen { $0.flatMap { Observable.just() } } | |
} | |
public func repeatWithBackoff(_ backoffPolicy: @escaping (Int) -> RxTimeInterval, scheduler: SchedulerType) -> RxSwift.Observable<Self.E> { | |
return self.repeatWhen { | |
$0.flatMapWithIndex { _, i -> Observable<Swift.Void> in | |
let backoff = backoffPolicy(i) | |
return backoff > 0 ? Observable.just().delay(backoff, scheduler: scheduler) : Observable.empty() | |
} | |
} | |
} | |
public func retryWithBackoff(_ retryPolicy: @escaping (Int, Swift.Error) -> RxTimeInterval, scheduler: SchedulerType) -> RxSwift.Observable<Self.E> { | |
return self.retryWhen { | |
$0.flatMapWithIndex { err, i -> Observable<Swift.Void> in | |
let backoff = retryPolicy(i, err) | |
return backoff > 0 ? Observable.just().delay(backoff, scheduler: scheduler) : Observable.empty() | |
} | |
} | |
} | |
public func takeUntilDeallocOrDisappear(_ viewController : UIViewController) -> RxSwift.Observable<Self.E> { | |
return self.takeUntil(Observable.merge([ | |
viewController.rx.methodInvoked(#selector(viewController.viewDidDisappear(_:))).mapToVoid(), | |
viewController.rx.deallocated | |
])) | |
} | |
public func apply<T>(_ transform: (Observable<Self.E>) -> Observable<T>) -> Observable<T> { | |
return transform(self.asObservable()) | |
} | |
} | |
public class Schedulers { | |
public static let main = MainScheduler.instance | |
public static let db = SingleThreadImmediateScheduler("li.vin.thread.db") | |
public static let ioQueue = DispatchQueue( | |
label: "li.vin.queue.io", | |
qos: .utility, | |
attributes: [.concurrent], | |
target: nil | |
) | |
public static let io = ConcurrentDispatchQueueScheduler(queue: ioQueue) | |
public static let computationQueue = DispatchQueue( | |
label: "li.vin.queue.computation", | |
qos: .userInteractive, | |
attributes: [.concurrent], | |
target: nil | |
) | |
public static let computation = ConcurrentDispatchQueueScheduler(queue: computationQueue) | |
private init() { | |
} | |
} | |
public class SingleThreadImmediateScheduler : ImmediateSchedulerType, Disposable { | |
var started : Bool = false | |
var disposed : Bool = false | |
let lock = NSLock() | |
var actions : [() -> ()] = [] | |
lazy var thread : Thread = { | |
let thr = Thread.init { | |
while !Thread.current.isCancelled { | |
var action : (() -> ())? | |
self.lock.whileLocked { action = self.actions.popLast() } | |
action?() | |
} | |
} | |
thr.name = self.name | |
thr.qualityOfService = self.qos | |
return thr | |
}() | |
let name : String | |
let qos : QualityOfService | |
public init(_ name : String = NSUUID().uuidString, qos : QualityOfService = .utility) { | |
self.name = name | |
self.qos = qos | |
} | |
public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable { | |
let cancel = SingleAssignmentDisposable() | |
self.lock.whileLocked { | |
if self.disposed { return } | |
if !started { started = true; self.thread.start() } | |
self.actions.insert({ | |
if !cancel.isDisposed { cancel.setDisposable(action(state)) } | |
}, at: 0) | |
} | |
return cancel | |
} | |
public func dispose() { | |
self.lock.whileLocked { | |
if started { self.thread.cancel() } | |
self.disposed = true | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment