Last active
June 28, 2018 03:15
-
-
Save tcw165/ba3090ea76d5bf1280e012c62921cb30 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * The Observable that emits the items emitted by the [src] Observable when a | |
| * second ObservableSource, [whenSrc], emits an true; It buffers all them items | |
| * emitted by the [src] when [whenSrc] emits an false. | |
| */ | |
| open class TakeWhenObservable<T>(private val src: ObservableSource<T>, | |
| private val whenSrc: ObservableSource<Boolean>, | |
| private val bufferSize: Int = Flowable.bufferSize()) | |
| : Observable<T>() { | |
| override fun subscribeActual(observer: Observer<in T>) { | |
| val coordinator = Coordinator(src = src, | |
| whenSrc = whenSrc, | |
| actualObserver = observer, | |
| bufferSize = bufferSize) | |
| coordinator.subscribe() | |
| } | |
| internal class Coordinator<T>(val src: ObservableSource<T>, | |
| val whenSrc: ObservableSource<Boolean>, | |
| val actualObserver: Observer<in T>, | |
| val bufferSize: Int) | |
| : AtomicBoolean(false), | |
| Disposable { | |
| @Volatile | |
| private var canDrain = false | |
| private val queue = SpscLinkedArrayQueue<T>(bufferSize) | |
| private val srcObserver = SrcObserver(parent = this) | |
| private val whenObserver = WhenObserver(parent = this) | |
| fun subscribe() { | |
| // Establish a connections between this coordinator and the external | |
| // observer. | |
| actualObserver.onSubscribe(this) | |
| // Establish two connections between this coordinator and the two | |
| // inner observers observing the two external input signals. | |
| src.subscribe(srcObserver) | |
| whenSrc.subscribe(whenObserver) | |
| } | |
| fun bufferIt(t: T) { | |
| queue.offer(t) | |
| // Drop item when size is over the buffer size | |
| while (queue.size() > bufferSize) { | |
| queue.poll() | |
| } | |
| } | |
| fun canDrain(t: Boolean) { | |
| canDrain = t | |
| } | |
| fun drain() { | |
| synchronized(this) { | |
| while (!isDisposed && | |
| canDrain && | |
| !srcObserver.done && | |
| !queue.isEmpty) { | |
| val t = queue.poll()!! | |
| this.actualObserver.onNext(t) | |
| } | |
| } | |
| } | |
| fun onComplete() { | |
| if (!srcObserver.done) { | |
| actualObserver.onComplete() | |
| } | |
| } | |
| fun onError(err: Throwable) { | |
| actualObserver.onError(err) | |
| } | |
| override fun isDisposed(): Boolean { | |
| return get() | |
| } | |
| override fun dispose() { | |
| // Mark disposed! | |
| set(true) | |
| srcObserver.dispose() | |
| whenObserver.dispose() | |
| synchronized(this) { | |
| queue.clear() | |
| } | |
| } | |
| } | |
| internal class SrcObserver<T>(val parent: Coordinator<in T>) | |
| : Observer<T> { | |
| @Volatile | |
| var done = false | |
| val disposable = AtomicReference<Disposable>() | |
| override fun onComplete() { | |
| parent.onComplete() | |
| done = true | |
| } | |
| override fun onSubscribe(d: Disposable) { | |
| DisposableHelper.setOnce(disposable, d) | |
| } | |
| override fun onNext(t: T) { | |
| if (!done) { | |
| parent.bufferIt(t) | |
| parent.drain() | |
| } | |
| } | |
| override fun onError(e: Throwable) { | |
| if (!done) { | |
| parent.onError(e) | |
| } | |
| } | |
| fun dispose() { | |
| DisposableHelper.dispose(disposable) | |
| } | |
| } | |
| internal class WhenObserver<T>(val parent: Coordinator<in T>) | |
| : Observer<Boolean> { | |
| @Volatile | |
| var done = false | |
| val disposable = AtomicReference<Disposable>() | |
| override fun onComplete() { | |
| // Close the throttle permanently | |
| parent.canDrain(false) | |
| } | |
| override fun onSubscribe(d: Disposable) { | |
| DisposableHelper.setOnce(disposable, d) | |
| } | |
| override fun onNext(t: Boolean) { | |
| if (!done) { | |
| parent.canDrain(t) | |
| parent.drain() | |
| } | |
| } | |
| override fun onError(e: Throwable) { | |
| if (!done) { | |
| parent.onError(e) | |
| } | |
| } | |
| fun dispose() { | |
| DisposableHelper.dispose(disposable) | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment