Skip to content

Instantly share code, notes, and snippets.

@gzoller
Last active November 5, 2018 13:40
Show Gist options
  • Save gzoller/ffdbe3f4676e61b61998 to your computer and use it in GitHub Desktop.
Save gzoller/ffdbe3f4676e61b61998 to your computer and use it in GitHub Desktop.
Akka Stream FlexiFlow switch example
object PrimaryFlow extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val sj = ScalaJack()
val graph = FlowGraph.closed() { implicit builder: FlowGraph.Builder[Unit] =>
import FlowGraph.Implicits._
val in = Source(List.empty[String]) // hook this up to RabbitMQ
val out = Sink.foreach( (mc:Command) => println("Command:" +mc) )
val boom = Sink.foreach( (c:Command) => println("Boom: "+c) )
val commandSwitch = builder.add(CommandRouter())
val merge = builder.add(Merge[Command](2))
val marshalCommand = Flow[String].map( sj.read[Command](_) ) // error handling here in case sj.read() fails
// Compiles but runtime error:
// Exception in thread "main" java.lang.IllegalArgumentException: Cannot build the RunnableGraph because there are unconnected ports: UniformFanOut.out0, UniformFanOut.out1, UniformFanOut.in
in ~> marshalCommand ~> commandSwitch.in
commandSwitch.outMsg ~> out
commandSwitch.outFallthru ~> boom
// Tried this too, wondering if having 2 Sinks in a FlowGraph is bad...maybe everything needs to resolve to 1 out?
// Anyway this produced the same exception.
//
//in ~> marshalCommand ~> commandSwitch.in
//commandSwitch.outMsg ~> ok ~> merge ~> out
//commandSwitch.outFallthru ~> bad ~> merge
}
}
// Complext bit... We have switch logic. Based on Command type we want to have separate
// routes (different stream to process each Command).
//
// First piece here defines the "pluggable" for the Flow: In=Command, Out=n outlets
import FanOutShape._
class CommandShape(_init: Init[Command] = Name[Command]("CommandRouter"))
extends FanOutShape[Command](_init) {
val outMsg = newOutlet[Command]("message")
val outFallthru = newOutlet[Command]("fallthru") // unknown message
protected override def construct(i: Init[Command]) = new CommandShape(i)
}
// This next bit is the Flow (FlexiRoute) that does the "smart fanout" based on the switch logic
// using the FanOutShape designed above.
case class CommandRouter() extends FlexiRoute[Command, CommandShape](new CommandShape, Attributes.name("CommandRouter")) {
import FlexiRoute._
override def createRouteLogic(p: PortT) = new RouteLogic[Command] {
override def initialState = State[Any](DemandFromAll(p.outMsg, p.outFallthru)) { (ctx, _, element) =>
element match {
case c: MessageCommand => ctx.emit(p.outMsg)(c)
// other commands here in the future
case unknown => ctx.emit(p.outFallthru)(unknown)
}
SameState
}
override def initialCompletionHandling = eagerClose
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment