Last active
June 13, 2016 09:07
-
-
Save hochgi/596050e7249d2dd0118d7c0a77e1056b to your computer and use it in GitHub Desktop.
PartitionWith measure
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "measure-partition-with" | |
scalaVersion := "2.11.8" | |
resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/releases" | |
libraryDependencies ++= Seq( | |
"com.storm-enroute" %% "scalameter" % "0.7", | |
"com.typesafe.akka" %% "akka-stream" % "2.4.7" | |
) | |
cancelable in Global := true | |
testFrameworks += new TestFramework("org.scalameter.ScalaMeterFramework") | |
parallelExecution in Test := false |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.ActorSystem | |
import akka.stream._ | |
import akka.stream.scaladsl._ | |
import akka.stream.stage._ | |
import com.typesafe.config.ConfigFactory | |
import org.scalameter._ | |
import scala.concurrent._ ,duration._ | |
import scala.concurrent.ExecutionContext.Implicits.global | |
object PartitionWithMeter { | |
object PartitionWith { | |
def getGraph[In, Out0, Out1](f: In => Either[Out0, Out1]) = GraphDSL.create() { implicit b => | |
import GraphDSL.Implicits._ | |
val map0 = b.add(Flow[In].map(f andThen {_.left.get})) | |
val map1 = b.add(Flow[In].map(f andThen {_.right.get})) | |
val part = b.add(Partition[In](2,{ i => | |
f(i).fold(_ => 0,_ => 1) | |
})) | |
part.out(0) ~> map0 | |
part.out(1) ~> map1 | |
new FanOutShape2(part.in, map0.out, map1.out) | |
} | |
def getGraphStage[In, Out0, Out1](p: In => Either[Out0, Out1]) = new GraphStage[FanOutShape2[In, Out0, Out1]] { | |
override val shape = new FanOutShape2[In, Out0, Out1]("partitionWith") | |
override def createLogic(attributes: Attributes) = new GraphStageLogic(shape) { | |
import shape._ | |
private var pending: Either[Out0, Out1] = null | |
setHandler(in, new InHandler { | |
override def onPush() = { | |
val elem = grab(in) | |
p(elem) match { | |
case Left(o) if isAvailable(out0) => | |
push(out0, o) | |
if (isAvailable(out1)) | |
pull(in) | |
case Right(o) if isAvailable(out1) => | |
push(out1, o) | |
if (isAvailable(out0)) | |
pull(in) | |
case either => | |
pending = either | |
} | |
} | |
override def onUpstreamFinish() = { | |
if (pending eq null) | |
completeStage() | |
} | |
}) | |
setHandler(out0, new OutHandler { | |
override def onPull() = if (pending ne null) pending.left.foreach { o => | |
push(out0, o) | |
if (isClosed(in)) completeStage() | |
else { | |
pending = null | |
if (isAvailable(out1)) | |
pull(in) | |
} | |
} | |
else if (!hasBeenPulled(in)) pull(in) | |
}) | |
setHandler(out1, new OutHandler { | |
override def onPull() = if (pending ne null) pending.right.foreach { o => | |
push(out1, o) | |
if (isClosed(in)) completeStage() | |
else { | |
pending = null | |
if (isAvailable(out0)) | |
pull(in) | |
} | |
} | |
else if (!hasBeenPulled(in)) pull(in) | |
}) | |
} | |
} | |
} | |
val counterStage = new GraphStage[BidiShape[Int, Int, Int, Int]] { | |
override val shape = BidiShape[Int, Int, Int, Int](Inlet[Int]("counter.source"),Outlet[Int]("counter.sink"),Inlet[Int]("counter.decrement"),Outlet[Int]("counter.complete")) | |
override def createLogic(attributes: Attributes) = new GraphStageLogic(shape) { | |
import shape._ | |
private var count: Int = 0 | |
override def preStart() = pull(in2) | |
setHandler(in1, new InHandler { | |
override def onPush() = { | |
val i = grab(in1) | |
// println(s"got input=$i and count=$count") | |
push(out1,i) | |
count += 1 | |
} | |
override def onUpstreamFinish() = { | |
if(count == 0) | |
completeStage() | |
} | |
}) | |
setHandler(in2, new InHandler { | |
override def onPush() = { | |
val e = grab(in2) | |
count -= e | |
// println(s"decrement e=$e") | |
if(count == 0 && isClosed(in1)) completeStage() | |
else pull(in2) | |
} | |
}) | |
setHandler(out1, new OutHandler { | |
override def onPull() = | |
if(!isClosed(in1)) | |
pull(in1) | |
}) | |
setHandler(out2, new OutHandler { | |
override def onPull() = { | |
println("should not happen!") | |
push(out2,0) | |
} | |
}) | |
} | |
} | |
val interceptCompletion = new GraphStage[FanInShape2[Int, Int, Int]] { | |
override val shape = new FanInShape2[Int, Int, Int]("intercepter") | |
override def createLogic(attributes: Attributes) = new GraphStageLogic(shape) { | |
import shape._ | |
setHandler(in0, new InHandler { | |
override def onPush() = | |
push(out,grab(in0)) | |
}) | |
setHandler(in1, new InHandler { | |
override def onPush() = { | |
grab(in1) | |
pull(in1) | |
} | |
override def onUpstreamFinish() = | |
completeStage() | |
}) | |
setHandler(out, new OutHandler { | |
override def onPull() = | |
pull(in0) | |
}) | |
} | |
} | |
val standardConfig = config( | |
Key.exec.minWarmupRuns -> 10, | |
Key.exec.maxWarmupRuns -> 100, | |
Key.exec.benchRuns -> 100, | |
Key.verbose -> true | |
) withWarmer(new Warmer.Default) | |
def main(args: Array[String]) { | |
implicit val system = { | |
def config = ConfigFactory.parseString("akka.stream.materializer.auto-fusing=true") | |
.withFallback(ConfigFactory.load()) | |
ActorSystem("default", config) | |
} | |
implicit val mat = ActorMaterializer() | |
val fun: Int => Either[Int,Int] = { | |
case 1 => Left(1) | |
case i if i % 2 == 0 => Right(i / 2) | |
case i => Right(i * 3 + 1) | |
} | |
val graphTime = standardConfig measure { | |
val f = RunnableGraph.fromGraph(GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) { | |
implicit b => | |
snk => | |
import GraphDSL.Implicits._ | |
val src = b.add(Source(1 to 1000)) | |
val cnt = b.add(counterStage) | |
val itc = b.add(interceptCompletion) | |
val mrg = b.add(MergePreferred[Int](1)) | |
val prt = b.add(PartitionWith.getGraph(fun)) | |
val bct = b.add(Broadcast[Int](2)) | |
src ~> cnt.in1 | |
cnt.out1 ~> mrg ~> prt.in | |
cnt.out2 ~> itc.in1 | |
prt.out1 ~> itc.in0 | |
itc.out ~> mrg.preferred | |
prt.out0 ~> bct ~> snk | |
bct ~> cnt.in2 | |
ClosedShape | |
}).run() | |
Await.ready(f,Duration.Inf) | |
} | |
println(s"graphTime: $graphTime ms") | |
val graphStageTime = standardConfig measure { | |
val f = RunnableGraph.fromGraph(GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) { | |
implicit b => | |
snk => | |
import GraphDSL.Implicits._ | |
val src = b.add(Source(1 to 1000)) | |
val cnt = b.add(counterStage) | |
val itc = b.add(interceptCompletion) | |
val mrg = b.add(MergePreferred[Int](1)) | |
val prt = b.add(PartitionWith.getGraphStage(fun)) | |
val bct = b.add(Broadcast[Int](2)) | |
src ~> cnt.in1 | |
cnt.out1 ~> mrg ~> prt.in | |
cnt.out2 ~> itc.in1 | |
prt.out1 ~> itc.in0 | |
itc.out ~> mrg.preferred | |
prt.out0 ~> bct ~> snk | |
bct ~> cnt.in2 | |
ClosedShape | |
}).run() | |
Await.ready(f,Duration.Inf) | |
} | |
println(s"graphStageTime: $graphStageTime ms") | |
system.shutdown() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment