-
-
Save rpassis/d8142bd6e4b1da9cd13a5583288799fe to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// EmitWhile.swift | |
// | |
// Created by Daniel Tartaglia on 09/06/2018. | |
// Copyright © 2018 Daniel Tartaglia. MIT License. | |
// | |
import RxSwift | |
/** | |
Calls `producer` with `seed` then emits result and also passes it to `pred`. Will continue to call `producer` with new values as long as `pred` returns values. | |
- parameter seed: The starting value needed for the first producer call. | |
- parameter pred: This closure determines what the next value pass into producer should be or returns nil if no more calls are necessary. | |
- parameter producer: The function that produces the Single result. | |
- returns: An observable that emits each producer's value. | |
*/ | |
func emitWhile<T, U>(seed: U, pred: @escaping (T) -> U?, producer: @escaping (U) -> Single<T>) -> Observable<T> { | |
return Observable.create { observer in | |
var disposable = CompositeDisposable() // this is much like a bag to collect all the disposables and dispose them. | |
let lock = NSRecursiveLock() // A thread guard incase each producer uses a different thread. Probably not strictly necessary. | |
// create a function that we will "recurse" over. | |
func loop(_ u: U) { | |
let product = producer(u) // create the `Single` object. | |
let subDisposable = product.subscribe { event in // subscribe to it. | |
lock.lock(); defer { lock.unlock() } // take care of locking the thread. | |
switch event { | |
case let .error(error): // if the single errors, just emit it. | |
observer.onError(error) | |
case let .success(value): // if the single emits a value. | |
observer.onNext(value) // pass it on. | |
if let u = pred(value) { // if the pred returns a new input for another single. | |
loop(u) // recurse with the new input | |
} | |
else { | |
observer.onCompleted() // otherwise we're done. | |
} | |
} | |
} | |
_ = disposable.insert(subDisposable) // collect all our disposables. | |
} | |
loop(seed) // start the recursive chain with the seed value. | |
return disposable | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment