Created
May 6, 2016 07:01
-
-
Save alexandru/bbebe852f96f872d8401e6788009fa9c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** Describes the internal state of `withStateFrom`. */ | |
| sealed trait WithLatestFrom[+A,+B] | |
| extends Product with Serializable | |
| object WithLatestFrom { | |
| /** Combines each element emitted by `source` with the latest | |
| * element emitted by `b`. | |
| * | |
| * See: | |
| * | |
| * 1. http://rxmarbles.com/#withLatestFrom | |
| * 2. http://reactivex.io/documentation/operators/combinelatest.html | |
| */ | |
| def apply[A,B,C](source: Observable[A], other: Observable[B]) | |
| (f: (A,B) => C): Observable[C] = { | |
| val sa = source.map(a => Left(a) : Either[A,B]) | |
| val sb = other.map(b => Right(b) : Either[A,B]) | |
| val init = Init : WithLatestFrom[A,B] | |
| val scanned = Observable.merge(sa, sb).scan(init) { (acc, elem) => | |
| acc match { | |
| case Init => | |
| elem match { | |
| case Left(a) => HasOther(a) | |
| case Right(_) => Init | |
| } | |
| case HasOther(a) => | |
| elem match { | |
| case Left(aa) => HasOther(aa) | |
| case Right(b) => Emit(a,b) | |
| } | |
| case Emit(a,_) => | |
| elem match { | |
| case Left(aa) => HasOther(aa) | |
| case Right(b) => Emit(a,b) | |
| } | |
| } | |
| } | |
| scanned.collect { | |
| case Emit(a,b) => f(a,b) | |
| } | |
| } | |
| case object Init extends WithLatestFrom[Nothing,Nothing] | |
| case class HasOther[+A](a: A) extends WithLatestFrom[A,Nothing] | |
| case class Emit[+A,+B](a: A, b: B) extends WithLatestFrom[A,B] | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment