Skip to content

Instantly share code, notes, and snippets.

@Renkai
Created April 6, 2021 13:25
Show Gist options
  • Save Renkai/d206a865fedbdd3f52c905004e8a7725 to your computer and use it in GitHub Desktop.
Save Renkai/d206a865fedbdd3f52c905004e8a7725 to your computer and use it in GitHub Desktop.
Demo to measure time consumption of a flow
object TimedFlow {
def apply[In, Out](innerFlow: Flow[In, Out, NotUsed], func: (Long, Long) => Any): Flow[In, Out, NotUsed] = {
val flowWithLong = Flow.fromGraph(GraphDSL.create() {
implicit builder =>
import akka.stream.scaladsl.GraphDSL.Implicits._
val unzip = builder.add(Unzip[In, Long]())
val zip = builder.add(Zip[Out, Long]())
unzip.out0 ~> innerFlow ~> zip.in0
unzip.out1 ~> zip.in1
FlowShape(unzip.in, zip.out)
})
Flow[In]
.map(in => (in, System.currentTimeMillis()))
.via(flowWithLong)
.via(Flow[(Out, Long)].map {
case (out, beginTime) =>
val endTime = System.currentTimeMillis()
func(beginTime, endTime)
out
})
}
def main(args: Array[String]): Unit = {
implicit val system: ActorSystem = ActorSystem("QuickStart")
val source: Source[Int, NotUsed] = Source(1 to 100)
implicit val ec = system.dispatcher
val plusOneFlowWithTimePrint = TimedFlow(plusOneFlow(), (beginTime: Long, endTime: Long) => {
println(s"begin ${beginTime} end ${endTime}")
println(s"end - begin: ${endTime - beginTime}")
})
val done = source.via(plusOneFlowWithTimePrint).runForeach(println)
done.onComplete(_ => system.terminate())
}
def plusOneFlow(): Flow[Int, Int, NotUsed] = {
Flow[Int]
.map {
x =>
Thread.sleep(50)
x + 1
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment