Created
September 24, 2014 17:10
-
-
Save staltz/12ff60f0e79b14c1cd1c to your computer and use it in GitHub Desktop.
.delayUntil() for RxJava
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Delays all items from the source Observable up until when the other Observable | |
* emits its first item. After the other Observable emitted its first item, | |
* the source items are not delayed. | |
* | |
* source: ---s-s---s----------s---s----s---s---s--|> | |
* other: ------------o------o-------o------o-----|> | |
* result: ------------sss-----s---s----s---s---s--|> | |
* | |
* @param first | |
* The source Observable of main interest | |
* @param other | |
* The Observable that should be precursor to any items emitted by the source. | |
* @return an Observable that emits items only after the first item of the | |
* other Observable has emitted, otherwise behaves like the source Observable. | |
*/ | |
def static <TFirst, TSecond> Observable<TFirst> delayUntil( | |
Observable<TFirst> source, Observable<TSecond> other) | |
{ | |
val hotOther = other.publish().refCount() | |
val sourceBeforePrecursor = source | |
.buffer([ hotOther.take(1) ]) | |
.flatMap([ list | Observable.from(list) ]) | |
val sourceAfterPrecursor = source.skipUntil(hotOther) | |
return Observable.merge(sourceBeforePrecursor, sourceAfterPrecursor) | |
} | |
// This is written in Xtend (http://www.eclipse.org/xtend/) but the same code should | |
// be easily translatable to Java or Scala |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment