Created
April 4, 2014 21:56
-
-
Save lancewalton/9983866 to your computer and use it in GitHub Desktop.
Example of RX with a feedback loop
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.collection.mutable | |
import rx.lang.scala.{Observable, Subscriber, Subscription} | |
// This is here to let us construct an Observable that we can 'tell' of new events. | |
case class SimpleObservable[T](initial: Option[T] = None) { | |
private val subscribers: mutable.HashSet[Subscriber[T]] = new mutable.HashSet[Subscriber[T]] with mutable.SynchronizedSet[Subscriber[T]] | |
private var lastValue: Option[T] = initial | |
val observable = Observable { (subscriber: Subscriber[T]) => | |
subscribe(subscriber) | |
lastValue.foreach(subscriber.onNext) | |
subscriber.add(Subscription(unsubscribe(subscriber))) | |
} | |
def tell(t: T): Boolean = { | |
if (Option(t) != lastValue) { | |
lastValue = Some(t) | |
subscribers.foreach(_.onNext(t)) | |
true | |
} else false | |
} | |
private def subscribe(s: Subscriber[T]): Unit = subscribers += s | |
private def unsubscribe(s: Subscriber[T]): Unit = subscribers -= s | |
} | |
object Observables { | |
case class Mapper[T](result: Observable[T], mapAndTell: (T => T) => Boolean) | |
// This constructs a Mapper whose 'result' is an Observable and provides a function which we can use to | |
// map the current observables value, resulting in a new value for that observable | |
// e.g. | |
// val m = mapper[List[Int](Nil) | |
// m.mapAndTell(l => 1:: l) | |
// m.mapAndTell(l => 2:: l) | |
// would result in m.result initially having the value Nil, then 1 :: Nil, then 2 :: 1 :: Nil | |
// Note that it uses feedback to achieve this: the observable we give out is the 'accumulator'.observable | |
// When we use mapAndTell, the current accumulator.observable value is mapped with the given function and | |
// then we 'tell' the accumulator what it's new value is. | |
def mapper[T](initial: T) = { | |
val accumulator = SimpleObservable[T](Some(initial)) | |
val mapper = SimpleObservable[Function1[T, T]]() | |
val combiner = accumulator.observable combineLatest mapper.observable map { case (acc, map) ⇒ map(acc) } distinctUntilChanged | |
val combinerSubscription = combiner.subscribe { accumulator.tell(_) } | |
Mapper(accumulator.observable, mapper.tell _) | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment