Last active
February 10, 2019 01:16
-
-
Save quelgar/d13d313e50bd1acd3b9bb9f687d8931a to your computer and use it in GitHub Desktop.
**NOTE**: I think this turned out to be pretty buggy. A custom Akka graph stage that caches the latest value for a period of time.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.time.{Clock, Instant} | |
import akka.NotUsed | |
import akka.actor.ActorSystem | |
import akka.stream.{Attributes, FlowShape, Inlet, Outlet, _} | |
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler, _} | |
import scaladsl.{Flow, _} | |
import scala.concurrent.Await | |
import scala.concurrent.duration._ | |
/** | |
* A custom Akka graph stage that caches the latest value for a period of time. | |
* | |
* The upstream is only asked to produce a value if: there is demand from downstream and, | |
* the cache duration has expired. If the cache duration has not yet expired, all downstream pull | |
* requests are replied to with the cached value. | |
* | |
* The stage initially starts without a value, but in an expired state. This means the first pull | |
* from downstream will result in a pull to the upstream. | |
* | |
* This is useful when you always want to be able to immediately provide a value in response to | |
* downstream demand, but the source may be expensive to pull from, and you know the values emitted | |
* by the source change infrequently. | |
*/ | |
final class CacheStage[A](cacheFor: FiniteDuration, clock: Clock) extends GraphStage[FlowShape[A, A]] { | |
val in = Inlet[A]("CacheStage.in") | |
val out = Outlet[A]("CacheStage.out") | |
override def shape: FlowShape[A, A] = FlowShape.of(in, out) | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { | |
private var currentValue: A = _ | |
private var expires = Instant.MIN | |
setHandlers(in, out, new InHandler with OutHandler { | |
override def onPush(): Unit = { | |
currentValue = grab(in) | |
expires = clock.instant() plusMillis cacheFor.toMillis | |
push(out, currentValue) | |
} | |
override def onPull(): Unit = { | |
if (clock.instant() isAfter expires) { | |
pull(in) | |
} else { | |
push(out, currentValue) | |
} | |
} | |
}) | |
} | |
} | |
object CacheStage { | |
def apply[A](cacheFor: FiniteDuration, clock: Clock): Flow[A, A, NotUsed] = { | |
Flow.fromGraph(new CacheStage(cacheFor, clock)) | |
} | |
} | |
object CacheStageTest { | |
def main(args: Array[String]): Unit = { | |
val src = Source(1 to 5) | |
implicit val system = ActorSystem("CacheStageTest") | |
implicit val materializer = ActorMaterializer() | |
val clock = Clock.systemDefaultZone() | |
// demand a value every second, cache each value for 5 seconds | |
val flow = src.via(CacheStage(5 second span, clock)) | |
.delay(1 second span, DelayOverflowStrategy.backpressure) | |
.withAttributes(Attributes.inputBuffer(1, 1)) | |
Await.result(flow.runForeach(i => println(f"${clock.instant().toEpochMilli}%TT : $i")), 1 minute span) | |
println(Await.result(system.terminate(), 1 minute span)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment