I'm somewhat experienced with Akka Actors, but wanted to try Streams since it is the new hotness and compatible with the reactive streams initiative.
My use case is that I want to create a Source[GetActivityTaskResult]
to
interact with an AWS Step Functions State Machine. Basically, I want to
poll for tasks that my workers can operate on.
So I have code like this:
val activities: Source[GetActivityTaskResult] = ???
RunnableGraph.fromGraph(
GraphDSL.create(Sink.foreach(println)) { implicit builder => sink =>
import GraphDSL.Implicits._
activities ~>
sink
And this worked just fine, printing each of the tasks; however, I added a Flow
to do some work on the activities.
def doWork(activityTask: GetActivityTaskResult): Future[WorkResult] = ???
activities ~>
Flow[GetActivityTaskResult].mapAsync(doWork) ~>
sink
Things stopped printing to the console, so I knew that there was a problem with doing this anynchronous work. The solution was to add a hint that indicates asynchronous boundaries for the graph.
activities.async ~>
Flow[GetActivityTaskResult].mapAsync(doWork).async ~>
sink
Once I added the boundaries, results were printed from the Sink
as expected.