Skip to content

Instantly share code, notes, and snippets.

@tcw165
Last active June 28, 2018 03:15
Show Gist options
  • Select an option

  • Save tcw165/ba3090ea76d5bf1280e012c62921cb30 to your computer and use it in GitHub Desktop.

Select an option

Save tcw165/ba3090ea76d5bf1280e012c62921cb30 to your computer and use it in GitHub Desktop.
/**
* The Observable that emits the items emitted by the [src] Observable when a
* second ObservableSource, [whenSrc], emits an true; It buffers all them items
* emitted by the [src] when [whenSrc] emits an false.
*/
open class TakeWhenObservable<T>(private val src: ObservableSource<T>,
private val whenSrc: ObservableSource<Boolean>,
private val bufferSize: Int = Flowable.bufferSize())
: Observable<T>() {
override fun subscribeActual(observer: Observer<in T>) {
val coordinator = Coordinator(src = src,
whenSrc = whenSrc,
actualObserver = observer,
bufferSize = bufferSize)
coordinator.subscribe()
}
internal class Coordinator<T>(val src: ObservableSource<T>,
val whenSrc: ObservableSource<Boolean>,
val actualObserver: Observer<in T>,
val bufferSize: Int)
: AtomicBoolean(false),
Disposable {
@Volatile
private var canDrain = false
private val queue = SpscLinkedArrayQueue<T>(bufferSize)
private val srcObserver = SrcObserver(parent = this)
private val whenObserver = WhenObserver(parent = this)
fun subscribe() {
// Establish a connections between this coordinator and the external
// observer.
actualObserver.onSubscribe(this)
// Establish two connections between this coordinator and the two
// inner observers observing the two external input signals.
src.subscribe(srcObserver)
whenSrc.subscribe(whenObserver)
}
fun bufferIt(t: T) {
queue.offer(t)
// Drop item when size is over the buffer size
while (queue.size() > bufferSize) {
queue.poll()
}
}
fun canDrain(t: Boolean) {
canDrain = t
}
fun drain() {
synchronized(this) {
while (!isDisposed &&
canDrain &&
!srcObserver.done &&
!queue.isEmpty) {
val t = queue.poll()!!
this.actualObserver.onNext(t)
}
}
}
fun onComplete() {
if (!srcObserver.done) {
actualObserver.onComplete()
}
}
fun onError(err: Throwable) {
actualObserver.onError(err)
}
override fun isDisposed(): Boolean {
return get()
}
override fun dispose() {
// Mark disposed!
set(true)
srcObserver.dispose()
whenObserver.dispose()
synchronized(this) {
queue.clear()
}
}
}
internal class SrcObserver<T>(val parent: Coordinator<in T>)
: Observer<T> {
@Volatile
var done = false
val disposable = AtomicReference<Disposable>()
override fun onComplete() {
parent.onComplete()
done = true
}
override fun onSubscribe(d: Disposable) {
DisposableHelper.setOnce(disposable, d)
}
override fun onNext(t: T) {
if (!done) {
parent.bufferIt(t)
parent.drain()
}
}
override fun onError(e: Throwable) {
if (!done) {
parent.onError(e)
}
}
fun dispose() {
DisposableHelper.dispose(disposable)
}
}
internal class WhenObserver<T>(val parent: Coordinator<in T>)
: Observer<Boolean> {
@Volatile
var done = false
val disposable = AtomicReference<Disposable>()
override fun onComplete() {
// Close the throttle permanently
parent.canDrain(false)
}
override fun onSubscribe(d: Disposable) {
DisposableHelper.setOnce(disposable, d)
}
override fun onNext(t: Boolean) {
if (!done) {
parent.canDrain(t)
parent.drain()
}
}
override fun onError(e: Throwable) {
if (!done) {
parent.onError(e)
}
}
fun dispose() {
DisposableHelper.dispose(disposable)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment