Skip to content

Instantly share code, notes, and snippets.

@reline
Created September 19, 2019 15:56
Show Gist options
  • Save reline/109d23cca0451fde7110c9af2597cf01 to your computer and use it in GitHub Desktop.
Save reline/109d23cca0451fde7110c9af2597cf01 to your computer and use it in GitHub Desktop.
RxObservable that only emits to first subscriber
fun <T> Observable<T>.toColdSingleSubscribeable(): Observable<T> {
return ColdObservableSingleSubscriberProxy(this)
}
class ColdObservableSingleSubscriberProxy<T>(private val observable: Observable<T>) : Observable<T>() {
private val hasBeenSubscribed = AtomicBoolean(false)
override fun subscribeActual(observer: Observer<in T>) {
if (hasBeenSubscribed.compareAndSet(false, true)) {
observable.subscribe(observer)
} else {
never<T>().subscribe(observer)
}
}
}
@Test
fun `assert observable fires only once ever`() {
val testObservable = Observable.just(true).toColdSingleSubscribeable()
assertFalse(testObservable.test().assertValue(true).isDisposed)
assertFalse(testObservable.test().assertNoValues().assertEmpty().isDisposed)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment