Skip to content

Instantly share code, notes, and snippets.

@alexandru
Created May 6, 2016 07:01
Show Gist options
  • Select an option

  • Save alexandru/bbebe852f96f872d8401e6788009fa9c to your computer and use it in GitHub Desktop.

Select an option

Save alexandru/bbebe852f96f872d8401e6788009fa9c to your computer and use it in GitHub Desktop.
/** Describes the internal state of `withStateFrom`. */
sealed trait WithLatestFrom[+A,+B]
extends Product with Serializable
object WithLatestFrom {
/** Combines each element emitted by `source` with the latest
* element emitted by `b`.
*
* See:
*
* 1. http://rxmarbles.com/#withLatestFrom
* 2. http://reactivex.io/documentation/operators/combinelatest.html
*/
def apply[A,B,C](source: Observable[A], other: Observable[B])
(f: (A,B) => C): Observable[C] = {
val sa = source.map(a => Left(a) : Either[A,B])
val sb = other.map(b => Right(b) : Either[A,B])
val init = Init : WithLatestFrom[A,B]
val scanned = Observable.merge(sa, sb).scan(init) { (acc, elem) =>
acc match {
case Init =>
elem match {
case Left(a) => HasOther(a)
case Right(_) => Init
}
case HasOther(a) =>
elem match {
case Left(aa) => HasOther(aa)
case Right(b) => Emit(a,b)
}
case Emit(a,_) =>
elem match {
case Left(aa) => HasOther(aa)
case Right(b) => Emit(a,b)
}
}
}
scanned.collect {
case Emit(a,b) => f(a,b)
}
}
case object Init extends WithLatestFrom[Nothing,Nothing]
case class HasOther[+A](a: A) extends WithLatestFrom[A,Nothing]
case class Emit[+A,+B](a: A, b: B) extends WithLatestFrom[A,B]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment