Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save farico/d2490c374e64bfbb30d9f507a6518172 to your computer and use it in GitHub Desktop.
Save farico/d2490c374e64bfbb30d9f507a6518172 to your computer and use it in GitHub Desktop.
Measuring Akka Streams Backpressure

Akka Streams Backpressure Gauge

Measures if there is backpressure on an Akka Stream at the given stage.

Motivation

When an Akka Stream doesn't perform as intended, it is difficult to tell where the bottleneck is, since all stages operate at the same rate (see e.g. here ).

So when things are not fast as expected, this pressure gauge can help you to narrow down on the slow stage. Imagine your bottleneck is a narrow passage in a water pipe. What you'll observe is that:

  • pressure before the bottleneck will be high (no matter how far before you measure)
  • pressure after the bottleneck will be low (no matter how far after you measure).

PressureGauge gives you a tool that follows this intuition.

How to interpret the Data

PressureGauge is implemented to give some information while producing low overhead. Therefore, it collects a boolean indicating if the stage is under pressure. Although this is only binary information, most of the time the measurement will be consistent and therefore sufficient. Unless the rates of the gauges' up- and downstreams are within 1 % of each other, chances to get different measurements every time the gauge is queried are very low.

If you need a more detailed picture, one option would be to sample the pressure boolean at a given frequency (e.g. every 50 milliseconds), retain a constant number of samples (e.g. 40), and build an average over these samples. A simple way how this could be done is shown in the example below. Expect to get 0% or 100% backpressure most of the time, and only very rarely values in between.

import java.util.concurrent.atomic.AtomicReference
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.{Keep, Sink, Source}
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
object Example {
def sampleStream() = {
implicit val system: ActorSystem = ActorSystem()
implicit val mat: ActorMaterializer = ActorMaterializer()
val (before, after) = Source
.repeat(1)
.viaMat(PressureGauge())(Keep.right)
.throttle(20, 1.second, 5, ThrottleMode.Shaping)
.viaMat(PressureGauge())(Keep.both)
.toMat(Sink.ignore)(Keep.left)
.run()
val beforeSamples = newSamples
val afterSamples = newSamples
implicit val ec: ExecutionContext = system.dispatcher
// take samples every 50 ms.
system.scheduler.schedule(50.millis, 50.millis) {
addSample(before.underPressure, beforeSamples)
addSample(after.underPressure, afterSamples)
}
system.scheduler.schedule(1.second, 1.second) {
println(f"Backpressure before ${average(beforeSamples)}%3.0f %%, after ${average(afterSamples)}%3.0f %%")
}
Thread.sleep(20.seconds.toMillis)
}
type Samples = AtomicReference[List[Boolean]]
def newSamples: Samples = new AtomicReference(List.empty[Boolean])
def addSample(v: Boolean, s: Samples): Unit = s.set(v +: s.get().take(39)) // keep 40 samples.
def average(s: Samples): Double = {
val list = s.get()
(100.0 * list.count(identity)) / list.size
}
}
import java.util.concurrent.atomic.AtomicBoolean
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.Attributes.Name
import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue, InHandler, OutHandler}
object PressureGauge {
trait State {
def name: String
def underPressure: Boolean
}
def apply[A](): PressureGauge[A] = new PressureGauge[A]()
}
class PressureGauge[A] extends GraphStageWithMaterializedValue[FlowShape[A, A], PressureGauge.State] {
val in = Inlet[A]("PressureGauge.in")
val out = Outlet[A]("PressureGauge.out")
override val shape = FlowShape.of(in, out)
override def createLogicAndMaterializedValue(attr: Attributes): (GraphStageLogic, PressureGauge.State) = {
val backpressure = new AtomicBoolean(true)
val logic = new GraphStageLogic(shape) {
setHandler(
in,
new InHandler {
override def onPush(): Unit = {
backpressure.set(true)
push(out, grab(in))
}
}
)
setHandler(
out,
new OutHandler {
override def onPull(): Unit = {
backpressure.set(false)
pull(in)
}
}
)
}
val state = new PressureGauge.State {
override def underPressure = backpressure.get()
override val name = attr.get[Name](Name("pressureGauge")).n
}
(logic, state)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment