Last active
June 13, 2017 23:45
-
-
Save hussachai/a3266d3a4298493f857f593c304c9388 to your computer and use it in GitHub Desktop.
Bad snippet! Don't do it.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object BadBidiFlow extends App { | |
implicit val system = ActorSystem() | |
implicit val ec = system.dispatcher | |
implicit val materializer = ActorMaterializer() | |
val source = Source.fromIterator(() => Seq("1", "2a", "3").toIterator) | |
val sink = Sink.foreach(println) | |
val endFlow = Flow.fromFunction[String, (String, Try[Int])]{ a => (a, Try(a.toInt)) } | |
val graph = BidiFlow.fromGraph(new BidiRetryGraph).join(endFlow) | |
val runnableGraph = source.via(graph).to(sink) | |
runnableGraph.run() | |
class BidiRetryGraph extends GraphStage[BidiShape[String, String, (String, Try[Int]), Try[Int]]] { | |
val in0 = Inlet[String]("in0") | |
val out0 = Outlet[String]("out0") | |
val in1 = Inlet[(String, Try[Int])]("in1") | |
val out1 = Outlet[Try[Int]]("out1") | |
override def shape: BidiShape[String, String, (String, Try[Int]), Try[Int]] = BidiShape.of(in0, out0, in1, out1) | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { | |
setHandler(in0, new InHandler(){ | |
override def onPush(): Unit = { | |
push(out0, grab(in0)) | |
} | |
}) | |
setHandler(out0, new OutHandler(){ | |
override def onPull(): Unit = { | |
pull(in0) | |
} | |
}) | |
setHandler(in1, new InHandler(){ | |
override def onPush(): Unit = { | |
val res = grab(in1) | |
res._2 match { | |
case Success(_) => | |
push(out1, res._2) | |
case Failure(e) => | |
val n = res._1.filter(c => c.isDigit).mkString | |
push(out0, n) // requirement failed: Cannot push port (out0) twice | |
} | |
} | |
}) | |
setHandler(out1, new OutHandler(){ | |
override def onPull(): Unit = { | |
pull(in1) | |
} | |
}) | |
} | |
} | |
} |
Author
hussachai
commented
Jun 13, 2017
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment