Skip to content

Instantly share code, notes, and snippets.

@kevinwright
Last active December 14, 2015 06:59
Show Gist options
  • Select an option

  • Save kevinwright/5046795 to your computer and use it in GitHub Desktop.

Select an option

Save kevinwright/5046795 to your computer and use it in GitHub Desktop.
Signal and FocussedSignal
import akka.agent.Agent
import akka.util.Timeout
import akka.actor.ActorSystem
import concurrent.{Future, Promise}
import concurrent.duration.FiniteDuration
import shapeless.Lens
object Signal {
def apply[T](initialValue: T)(implicit system: ActorSystem) = new Signal(initialValue)
def forActorSystem[T](system: ActorSystem)(initialValue: T) = new Signal(initialValue)(system)
}
trait AbstractSignal[T] {
def send(x: T)
def send(fn: T => T)
def get: T
def await(implicit timeout: Timeout): T
def next(implicit timeout: Timeout): Future[T]
def waitUntil(cond: T => Boolean)(implicit timeout: Timeout): Future[T]
}
class Signal[T](initialValue: T)(implicit val system: ActorSystem) extends AbstractSignal[T] {
import system.dispatcher
private[this] case class Atom(now: T, next: Promise[Atom]) {
def nextThat(cond: T => Boolean): Future[T] =
Future(now) filter cond recoverWith { case _ => next.future flatMap {_ nextThat cond}}
}
private[this] def newAtom(value: T) = Atom(value, Promise())
private[this] val agent = Agent(newAtom(initialValue))
def send(x: T) {
agent send { oldAtom =>
val newNow = newAtom(x)
oldAtom.next success newNow
newNow
}
}
def send(fn: T => T) {
agent send { oldAtom =>
val newNow = newAtom(fn(oldAtom.now))
oldAtom.next success newNow
newNow
}
}
def failIn(duration: FiniteDuration): Future[T] = {
val p = Promise[T]()
system.scheduler.scheduleOnce(duration) { FutureTimeoutException(duration) }
p.future
}
def get: T = agent.get().now
def await(implicit timeout: Timeout): T = agent.await(timeout).now
def next(implicit timeout: Timeout): Future[T] = Future.firstCompletedOf(
Seq(
agent.future(timeout) flatMap { _.next.future map {_.now} },
failIn(timeout.duration)
)
)
def waitUntil(cond: T => Boolean)(implicit timeout: Timeout): Future[T] = Future.firstCompletedOf(
Seq(
agent.future(timeout) flatMap { _ nextThat cond },
failIn(timeout.duration)
)
)
def focusOn[X](lens: Lens[T,X]): AbstractSignal[X] = new FocusedSignal(this, lens)
}
class FocusedSignal[F,T](underlying: Signal[F], lens: Lens[F,T])(implicit val system: ActorSystem) extends AbstractSignal[T] {
import system.dispatcher
def send(x: T) { underlying.send(lens.set(_)(x)) }
def send(fn: T => T) { underlying.send(lens.modify(_)(fn)) }
def get: T = { lens.get(underlying.get) }
def await(implicit timeout: Timeout): T = { lens.get(underlying.await) }
def next(implicit timeout: Timeout): Future[T] = { underlying.next map lens.get }
def waitUntil(cond: T => Boolean)(implicit timeout: Timeout): Future[T] = {
underlying.waitUntil(f => cond(lens get f)) map lens.get
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment