Last active
December 14, 2015 06:59
-
-
Save kevinwright/5046795 to your computer and use it in GitHub Desktop.
Signal and FocussedSignal
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import akka.agent.Agent | |
| import akka.util.Timeout | |
| import akka.actor.ActorSystem | |
| import concurrent.{Future, Promise} | |
| import concurrent.duration.FiniteDuration | |
| import shapeless.Lens | |
| object Signal { | |
| def apply[T](initialValue: T)(implicit system: ActorSystem) = new Signal(initialValue) | |
| def forActorSystem[T](system: ActorSystem)(initialValue: T) = new Signal(initialValue)(system) | |
| } | |
| trait AbstractSignal[T] { | |
| def send(x: T) | |
| def send(fn: T => T) | |
| def get: T | |
| def await(implicit timeout: Timeout): T | |
| def next(implicit timeout: Timeout): Future[T] | |
| def waitUntil(cond: T => Boolean)(implicit timeout: Timeout): Future[T] | |
| } | |
| class Signal[T](initialValue: T)(implicit val system: ActorSystem) extends AbstractSignal[T] { | |
| import system.dispatcher | |
| private[this] case class Atom(now: T, next: Promise[Atom]) { | |
| def nextThat(cond: T => Boolean): Future[T] = | |
| Future(now) filter cond recoverWith { case _ => next.future flatMap {_ nextThat cond}} | |
| } | |
| private[this] def newAtom(value: T) = Atom(value, Promise()) | |
| private[this] val agent = Agent(newAtom(initialValue)) | |
| def send(x: T) { | |
| agent send { oldAtom => | |
| val newNow = newAtom(x) | |
| oldAtom.next success newNow | |
| newNow | |
| } | |
| } | |
| def send(fn: T => T) { | |
| agent send { oldAtom => | |
| val newNow = newAtom(fn(oldAtom.now)) | |
| oldAtom.next success newNow | |
| newNow | |
| } | |
| } | |
| def failIn(duration: FiniteDuration): Future[T] = { | |
| val p = Promise[T]() | |
| system.scheduler.scheduleOnce(duration) { FutureTimeoutException(duration) } | |
| p.future | |
| } | |
| def get: T = agent.get().now | |
| def await(implicit timeout: Timeout): T = agent.await(timeout).now | |
| def next(implicit timeout: Timeout): Future[T] = Future.firstCompletedOf( | |
| Seq( | |
| agent.future(timeout) flatMap { _.next.future map {_.now} }, | |
| failIn(timeout.duration) | |
| ) | |
| ) | |
| def waitUntil(cond: T => Boolean)(implicit timeout: Timeout): Future[T] = Future.firstCompletedOf( | |
| Seq( | |
| agent.future(timeout) flatMap { _ nextThat cond }, | |
| failIn(timeout.duration) | |
| ) | |
| ) | |
| def focusOn[X](lens: Lens[T,X]): AbstractSignal[X] = new FocusedSignal(this, lens) | |
| } | |
| class FocusedSignal[F,T](underlying: Signal[F], lens: Lens[F,T])(implicit val system: ActorSystem) extends AbstractSignal[T] { | |
| import system.dispatcher | |
| def send(x: T) { underlying.send(lens.set(_)(x)) } | |
| def send(fn: T => T) { underlying.send(lens.modify(_)(fn)) } | |
| def get: T = { lens.get(underlying.get) } | |
| def await(implicit timeout: Timeout): T = { lens.get(underlying.await) } | |
| def next(implicit timeout: Timeout): Future[T] = { underlying.next map lens.get } | |
| def waitUntil(cond: T => Boolean)(implicit timeout: Timeout): Future[T] = { | |
| underlying.waitUntil(f => cond(lens get f)) map lens.get | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment