Last active
August 29, 2015 14:07
-
-
Save unktomi/b46c9497f89460e6512e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main.scala.test | |
import scala.collection.mutable | |
// Push Mode | |
// as in Rx | |
trait Observer[-A] { | |
def onNext(x: Option[A]): Unit | |
def contramap[B](f: B=>A): Observer[B] = { | |
val self = this | |
new Observer[B] { | |
def onNext(y: Option[B]): Unit = self.onNext(y.map(f)) | |
} | |
} | |
} | |
trait Observable[A] { | |
def subscribe(x: Observer[A]): Unit => Unit | |
def map[B](f: A=>B): Observable[B] = { | |
new MappedObservable[A, B](this, f) | |
} | |
def flatMap[B](f: A=>Observable[B]): Observable[B] = { | |
new FlatMappedObservable[A, B](this, f) | |
} | |
var ob: Observed[A] = null | |
// conversion to pull mode | |
def observed: Observed[A] = { | |
if (ob == null) { | |
val x = new Mutable[A] | |
subscribe(x) | |
ob = x | |
} | |
ob | |
} | |
} | |
class MappedObservable[A, B](val self: Observable[A], val f: A=>B) extends Observable[B] { | |
override def subscribe(y: Observer[B]): Unit => Unit = { | |
val obs = new Observer[A] { | |
override def onNext(x: Option[A]): Unit = { | |
y.onNext(x.map(f)) | |
} | |
} | |
self.subscribe(obs) | |
} | |
} | |
class FlatMappedObservable[A, B](val self: Observable[A], val f: A=>Observable[B]) extends Observable[B] { | |
override def subscribe(y: Observer[B]): Unit => Unit = { | |
self.subscribe(new Observer[A] { | |
var queue = new mutable.ListBuffer[Observable[B]]() | |
var sub: Unit => Unit = null | |
def performNext(ob: Observable[B]): Unit = { | |
if (sub != null) sub() | |
sub = ob.subscribe(new Observer[B] { | |
override def onNext(x: Option[B]): Unit = { | |
x match { | |
case Some(v) => y.onNext(x) | |
case None => { | |
if (queue.size > 0) { | |
performNext(queue.remove(0)) | |
} else { | |
y.onNext(None) | |
} | |
} | |
} | |
} | |
}) | |
} | |
override def onNext(x: Option[A]): Unit = { | |
x match { | |
case Some(v) => { | |
val ob = f(v) | |
if (sub == null) { | |
performNext(ob) | |
} else { | |
queue += ob | |
} | |
} | |
case None => { | |
if (queue.size > 0) { | |
performNext(queue.remove(0)) | |
} else { | |
y.onNext(None) | |
} | |
} | |
} | |
} | |
}) | |
} | |
} | |
// as in Rx | |
class Subject[A] extends Observer[A] with Observable[A] { | |
val subscribers: mutable.WeakHashMap[Observer[A], Unit] = new mutable.WeakHashMap[Observer[A], Unit] | |
override def onNext(x: Option[A]): Unit = { | |
for { y <- subscribers.keySet } { | |
y.onNext(x) | |
} | |
} | |
override def subscribe(x: Observer[A]): Unit => Unit = { | |
subscribers.put(x, ()) | |
import Observed._ | |
val xs = roots.get(x) | |
xs match { | |
case None => { | |
val s = new mutable.HashSet[Observable[_]] | |
roots.put(x, s) | |
s.add(this) | |
} | |
case Some(y) => y.add(this) | |
} | |
(nothing)=> { | |
unsubscribe(x) | |
} | |
} | |
def unsubscribe(x: Observer[A]): Unit = { | |
subscribers.remove(x) | |
import Observed._ | |
val xs = roots.get(x) | |
xs match { | |
case None => { | |
} | |
case Some(y) => y.remove(this) | |
} | |
} | |
} | |
// Lazy pull mode | |
abstract class Observed[A] { | |
def apply(): A | |
var memo: Option[A] = None | |
var valid: Boolean = false | |
var listeners: mutable.WeakHashMap[Observed[_], Unit] = null | |
var subject: Subject[A] = null | |
private def addListener[B](x: Observed[B]): Observed[B] = { | |
if (listeners == null) listeners = new mutable.WeakHashMap[Observed[_], Unit]() | |
listeners.put(x, ()) | |
x | |
} | |
def invalidate(): Unit = { | |
if (valid) { | |
valid = false | |
onInvalidate() | |
} | |
} | |
def onInvalidate(): Unit = { | |
invalidateDependents() | |
notifyObservers() | |
} | |
def changed: Option[A] = { | |
val prev: Option[A] = memo | |
val current: A = extract() | |
prev match { | |
case None => Some(current) | |
case Some(y) => if (y != current) Some(current) else None | |
} | |
} | |
def notifyObservers(): Unit = { | |
if (subject != null) { | |
changed match { | |
case None => () | |
case Some(x) => subject.onNext(Some(x)) | |
} | |
} | |
} | |
def invalidateDependents(): Unit = { | |
if (listeners != null) { | |
val nots = for { x <- listeners.keySet } yield ()=>x.invalidate() | |
for (j <- nots) j() | |
} | |
} | |
// conversion to push mode | |
def observable: Observable[A] = { | |
if (subject == null) subject = new Subject[A] | |
subject | |
} | |
// Functor | |
def map[B](f: A=>B): Observed[B] = { | |
val self = this | |
val r = new Observed[B] { | |
var lastInput: Option[A] = None | |
def apply(): B = { | |
val x = self() | |
val y = Some(x) | |
if (lastInput != y) { | |
lastInput = y | |
memo = Some(f(x)) | |
} | |
memo.get | |
} | |
} | |
addListener(r) | |
} | |
// Monad (from Comonad) | |
def flatMap[B](f: A=>Observed[B]): Observed[B] = { | |
val self = this | |
val r = new Observed[B] { | |
def apply(): B = f(self.extract()).extract() | |
} | |
addListener(r) | |
} | |
// Comonad | |
def extract(): A = { | |
if (!valid) { | |
memo = Some(apply()) | |
valid = true | |
} | |
memo.get | |
} | |
def coflatMap[B](f: Observed[A]=>B): Observed[B] = { | |
val self = this | |
val r = new Observed[B] { | |
def apply(): B = f(self) | |
} | |
addListener(r) | |
} | |
} | |
class Mutable[A] extends Observed[A] with Observer[A] { | |
def get: A = apply() | |
def set(x: A): Unit = onNext(Some(x)) | |
override def changed: Option[A] = memo | |
private def update(x: Option[A]): Unit = { | |
memo = x | |
onInvalidate() | |
} | |
def onNext(x: Option[A]): Unit = { | |
valid = true | |
if (x != memo) update(x) | |
} | |
override def apply(): A = memo.get | |
} | |
object Observed { | |
val roots = new mutable.WeakHashMap[Observer[_], mutable.Set[Observable[_]]]() | |
def ref[A]: Mutable[A] = { | |
new Mutable[A] | |
} | |
def Println(prefix: String): Observer[Any] = new Observer[Any] { | |
def onNext(x: Option[Any]): Unit = println(prefix+x) | |
override def toString: String = "Println "+ prefix | |
} | |
def main(argv: Array[String]): Unit = { | |
def Printx = Println("push x = ") | |
def Print2x = Println("push: x + x = ") | |
def PrintOne = Println("push: 2x / 2x = ") | |
def Print2xMinusOne = Println("push: 2x - 1 = ") | |
// create a reactive Int | |
val x = ref[Int] | |
val ob = x.observable // convert pull to push | |
ob.subscribe(Printx) | |
val twox = for { | |
i <- x | |
j <- x | |
} yield i + j | |
val one = for (i <- twox) yield i / i | |
twox.observable.subscribe(Print2x) | |
one.observable.subscribe(PrintOne) | |
val twoxMinusOne = for { i <- twox.observable } yield i - 1 | |
twoxMinusOne.subscribe(Print2xMinusOne) | |
println("x=100...") | |
val p = twoxMinusOne.observed // convert push to pull | |
x.set(100) | |
println("pull x + x = "+twox.extract()) | |
println("pull one = "+one.extract()) | |
println("pull 2x - 1 = "+p.extract()) | |
println("x=99...") | |
x.set(99) | |
println("pull x + x = "+twox.extract()) | |
println("pull one = "+one.extract()) | |
println("pull 2x - 1 = "+p.extract()) | |
x=100... | |
/* | |
push x = Some(100) | |
pull x + x = 200 | |
pull one = 1 | |
x=99... | |
push: x + x = Some(198) | |
push: 2x - 1 = Some(197) | |
push x = Some(99) | |
pull x + x = 198 | |
pull one = 1 | |
pull 2x - 1 = 197 | |
*/ | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment