Skip to content

Instantly share code, notes, and snippets.

@mekarthedev
Last active April 13, 2018 23:29
Show Gist options
  • Save mekarthedev/59b9a4370ea74e8b5ace85b0dc0f1c01 to your computer and use it in GitHub Desktop.
Save mekarthedev/59b9a4370ea74e8b5ace85b0dc0f1c01 to your computer and use it in GitHub Desktop.
Effective flatMapFirst for RxJava2's Observable
import io.reactivex.Observable
import io.reactivex.subjects.BehaviorSubject
import io.reactivex.subjects.PublishSubject
import java.util.concurrent.atomic.AtomicBoolean
fun <T, R> Observable<T>.flatMapFirst(transform: (T) -> Observable<R>) =
Observable.defer {
val busy = AtomicBoolean()
return@defer this
.filter { busy.compareAndSet(false, true) }
.flatMap {
transform(it).doAfterTerminate { busy.set(false) }
}
}
val input = PublishSubject.create<BehaviorSubject<Int>>()
input.flatMapFirst { it }.doOnNext { println(it) }.subscribe()
val p1 = BehaviorSubject.create<Int>()
input.onNext(p1)
p1.onNext(1)
val p2 = BehaviorSubject.create<Int>()
input.onNext(p2)
p2.onNext(2)
p2.onComplete()
val p3 = BehaviorSubject.create<Int>()
input.onNext(p3)
p3.onNext(3)
p3.onComplete()
p1.onComplete()
val p4 = BehaviorSubject.create<Int>()
input.onNext(p4)
p4.onNext(4)
p4.onComplete()
// Should print 1 and 4
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment