Skip to content

Instantly share code, notes, and snippets.

@staltz
Created September 24, 2014 17:10
Show Gist options
  • Save staltz/12ff60f0e79b14c1cd1c to your computer and use it in GitHub Desktop.
Save staltz/12ff60f0e79b14c1cd1c to your computer and use it in GitHub Desktop.
.delayUntil() for RxJava
/**
* Delays all items from the source Observable up until when the other Observable
* emits its first item. After the other Observable emitted its first item,
* the source items are not delayed.
*
* source: ---s-s---s----------s---s----s---s---s--|>
* other: ------------o------o-------o------o-----|>
* result: ------------sss-----s---s----s---s---s--|>
*
* @param first
* The source Observable of main interest
* @param other
* The Observable that should be precursor to any items emitted by the source.
* @return an Observable that emits items only after the first item of the
* other Observable has emitted, otherwise behaves like the source Observable.
*/
def static <TFirst, TSecond> Observable<TFirst> delayUntil(
Observable<TFirst> source, Observable<TSecond> other)
{
val hotOther = other.publish().refCount()
val sourceBeforePrecursor = source
.buffer([ hotOther.take(1) ])
.flatMap([ list | Observable.from(list) ])
val sourceAfterPrecursor = source.skipUntil(hotOther)
return Observable.merge(sourceBeforePrecursor, sourceAfterPrecursor)
}
// This is written in Xtend (http://www.eclipse.org/xtend/) but the same code should
// be easily translatable to Java or Scala
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment