Skip to content

Instantly share code, notes, and snippets.

@cmc5788
Last active November 14, 2020 01:34
Show Gist options
  • Save cmc5788/5ed38645dca67a04bd2baa80ef4e26ce to your computer and use it in GitHub Desktop.
Save cmc5788/5ed38645dca67a04bd2baa80ef4e26ce to your computer and use it in GitHub Desktop.
General RxSwift utility stuffs
import RxSwift
extension UIButton {
public func throttledTapObservable(_ throttle : RxTimeInterval = 1.0) -> RxSwift.Observable<Swift.Void> {
return self.rx.tap.asObservable()
.throttle(throttle, latest: false, scheduler: MainScheduler.instance)
}
}
/** Hacky way to turn the retryWhen operator into the (currently missing) repeatWhen operator. */
fileprivate class CoerceToRepeat : Error {
}
extension UIViewController {
public func printLifecycle(_ name : String) {
let willAppear = self.rx.methodInvoked(#selector(self.viewWillAppear(_:)))
.do(onNext:{ _ in print("\(name) viewWillAppear") })
let didAppear = self.rx.methodInvoked(#selector(self.viewDidAppear(_:)))
.do(onNext:{ _ in print("\(name) viewDidAppear") })
let willDisappear = self.rx.methodInvoked(#selector(self.viewWillDisappear(_:)))
.do(onNext:{ _ in print("\(name) viewWillDisappear") })
let didDisappear = self.rx.methodInvoked(#selector(self.viewDidDisappear(_:)))
.do(onNext:{ _ in print("\(name) viewDidDisappear") })
_ = Observable.merge([willAppear, didAppear, willDisappear, didDisappear])
.takeUntil(self.rx.deallocated)
.subscribe()
}
}
extension ObservableType {
public func filterFlatMap(_ filter: @escaping (Self.E) -> RxSwift.Observable<Bool>) -> RxSwift.Observable<Self.E> {
return self.flatMap { a -> Observable<Self.E> in
filter(a).take(1).flatMap { b -> Observable<Self.E> in
b ? Observable.just(a) : Observable.empty()
}
}
}
public func mapToVoid() -> RxSwift.Observable<Swift.Void> {
return self.map { _ in }
}
public func mapToString() -> RxSwift.Observable<String> {
return self.map { "\($0)" }
}
public func shareReplayLatestWhileConnectedWithLinger(_ delayTime : RxTimeInterval, scheduler : SchedulerType = Schedulers.main) -> RxSwift.Observable<Self.E> {
let sharedSource = self.concat(Observable.empty()
.delaySubscription(delayTime, scheduler: scheduler))
.shareReplayLatestWhileConnected()
return Observable<Self.E>.create { observer in
let sub = sharedSource.subscribe(onNext: {
observer.on(.next($0))
}, onError: {
observer.on(.error($0))
}, onCompleted: {
observer.on(.completed)
})
return Disposables.create {
_ = scheduler.scheduleRelative((), dueTime: delayTime) { _ in
sub.dispose()
return Disposables.create()
}
}
}
}
public func ignoreErrors() -> RxSwift.Observable<Self.E> {
return self.catchError { _ in Observable.empty() }
}
public func multiplex(_ observer : AnyObserver<Self.E>) -> RxSwift.Observable<Self.E> {
return self.do(onNext:{ observer.onNext($0) })
}
public func repeatWhen<TriggerObservable : ObservableType>(_ notificationHandler: @escaping (RxSwift.Observable<Swift.Void>) -> TriggerObservable) -> RxSwift.Observable<Self.E> {
return self.concat(Observable<Self.E>.error(CoerceToRepeat()))
.retryWhen { (errObs : Observable<Swift.Error>) -> TriggerObservable in
return notificationHandler(errObs.flatMap { err -> Observable<Swift.Void> in
err is CoerceToRepeat ? Observable.just() : Observable.error(err)
})
}
}
public func `repeat`(_ n : Int) -> RxSwift.Observable<Self.E> {
return self.repeatWhen { $0.flatMapWithIndex { _, i in i + 1 < n ? Observable.just() : Observable.empty() } }
}
public func `repeat`() -> RxSwift.Observable<Self.E> {
return self.repeatWhen { $0.flatMap { Observable.just() } }
}
public func repeatWithBackoff(_ backoffPolicy: @escaping (Int) -> RxTimeInterval, scheduler: SchedulerType) -> RxSwift.Observable<Self.E> {
return self.repeatWhen {
$0.flatMapWithIndex { _, i -> Observable<Swift.Void> in
let backoff = backoffPolicy(i)
return backoff > 0 ? Observable.just().delay(backoff, scheduler: scheduler) : Observable.empty()
}
}
}
public func retryWithBackoff(_ retryPolicy: @escaping (Int, Swift.Error) -> RxTimeInterval, scheduler: SchedulerType) -> RxSwift.Observable<Self.E> {
return self.retryWhen {
$0.flatMapWithIndex { err, i -> Observable<Swift.Void> in
let backoff = retryPolicy(i, err)
return backoff > 0 ? Observable.just().delay(backoff, scheduler: scheduler) : Observable.empty()
}
}
}
public func takeUntilDeallocOrDisappear(_ viewController : UIViewController) -> RxSwift.Observable<Self.E> {
return self.takeUntil(Observable.merge([
viewController.rx.methodInvoked(#selector(viewController.viewDidDisappear(_:))).mapToVoid(),
viewController.rx.deallocated
]))
}
public func apply<T>(_ transform: (Observable<Self.E>) -> Observable<T>) -> Observable<T> {
return transform(self.asObservable())
}
}
public class Schedulers {
public static let main = MainScheduler.instance
public static let db = SingleThreadImmediateScheduler("li.vin.thread.db")
public static let ioQueue = DispatchQueue(
label: "li.vin.queue.io",
qos: .utility,
attributes: [.concurrent],
target: nil
)
public static let io = ConcurrentDispatchQueueScheduler(queue: ioQueue)
public static let computationQueue = DispatchQueue(
label: "li.vin.queue.computation",
qos: .userInteractive,
attributes: [.concurrent],
target: nil
)
public static let computation = ConcurrentDispatchQueueScheduler(queue: computationQueue)
private init() {
}
}
public class SingleThreadImmediateScheduler : ImmediateSchedulerType, Disposable {
var started : Bool = false
var disposed : Bool = false
let lock = NSLock()
var actions : [() -> ()] = []
lazy var thread : Thread = {
let thr = Thread.init {
while !Thread.current.isCancelled {
var action : (() -> ())?
self.lock.whileLocked { action = self.actions.popLast() }
action?()
}
}
thr.name = self.name
thr.qualityOfService = self.qos
return thr
}()
let name : String
let qos : QualityOfService
public init(_ name : String = NSUUID().uuidString, qos : QualityOfService = .utility) {
self.name = name
self.qos = qos
}
public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
let cancel = SingleAssignmentDisposable()
self.lock.whileLocked {
if self.disposed { return }
if !started { started = true; self.thread.start() }
self.actions.insert({
if !cancel.isDisposed { cancel.setDisposable(action(state)) }
}, at: 0)
}
return cancel
}
public func dispose() {
self.lock.whileLocked {
if started { self.thread.cancel() }
self.disposed = true
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment