Last active
November 5, 2018 13:40
-
-
Save gzoller/ffdbe3f4676e61b61998 to your computer and use it in GitHub Desktop.
Akka Stream FlexiFlow switch example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object PrimaryFlow extends App { | |
implicit val system = ActorSystem() | |
implicit val materializer = ActorMaterializer() | |
val sj = ScalaJack() | |
val graph = FlowGraph.closed() { implicit builder: FlowGraph.Builder[Unit] => | |
import FlowGraph.Implicits._ | |
val in = Source(List.empty[String]) // hook this up to RabbitMQ | |
val out = Sink.foreach( (mc:Command) => println("Command:" +mc) ) | |
val boom = Sink.foreach( (c:Command) => println("Boom: "+c) ) | |
val commandSwitch = builder.add(CommandRouter()) | |
val merge = builder.add(Merge[Command](2)) | |
val marshalCommand = Flow[String].map( sj.read[Command](_) ) // error handling here in case sj.read() fails | |
// Compiles but runtime error: | |
// Exception in thread "main" java.lang.IllegalArgumentException: Cannot build the RunnableGraph because there are unconnected ports: UniformFanOut.out0, UniformFanOut.out1, UniformFanOut.in | |
in ~> marshalCommand ~> commandSwitch.in | |
commandSwitch.outMsg ~> out | |
commandSwitch.outFallthru ~> boom | |
// Tried this too, wondering if having 2 Sinks in a FlowGraph is bad...maybe everything needs to resolve to 1 out? | |
// Anyway this produced the same exception. | |
// | |
//in ~> marshalCommand ~> commandSwitch.in | |
//commandSwitch.outMsg ~> ok ~> merge ~> out | |
//commandSwitch.outFallthru ~> bad ~> merge | |
} | |
} | |
// Complext bit... We have switch logic. Based on Command type we want to have separate | |
// routes (different stream to process each Command). | |
// | |
// First piece here defines the "pluggable" for the Flow: In=Command, Out=n outlets | |
import FanOutShape._ | |
class CommandShape(_init: Init[Command] = Name[Command]("CommandRouter")) | |
extends FanOutShape[Command](_init) { | |
val outMsg = newOutlet[Command]("message") | |
val outFallthru = newOutlet[Command]("fallthru") // unknown message | |
protected override def construct(i: Init[Command]) = new CommandShape(i) | |
} | |
// This next bit is the Flow (FlexiRoute) that does the "smart fanout" based on the switch logic | |
// using the FanOutShape designed above. | |
case class CommandRouter() extends FlexiRoute[Command, CommandShape](new CommandShape, Attributes.name("CommandRouter")) { | |
import FlexiRoute._ | |
override def createRouteLogic(p: PortT) = new RouteLogic[Command] { | |
override def initialState = State[Any](DemandFromAll(p.outMsg, p.outFallthru)) { (ctx, _, element) => | |
element match { | |
case c: MessageCommand => ctx.emit(p.outMsg)(c) | |
// other commands here in the future | |
case unknown => ctx.emit(p.outFallthru)(unknown) | |
} | |
SameState | |
} | |
override def initialCompletionHandling = eagerClose | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment