-
-
Save steinybot/cd9aeac534a45aead54c4cea089e0e40 to your computer and use it in GitHub Desktop.
import akka.NotUsed | |
import akka.stream.{Attributes, Inlet, SinkShape} | |
import akka.stream.scaladsl.{Sink, Source} | |
import akka.stream.stage._ | |
class OneToOneOnDemandSink[T, +M](sink: T => Sink[T, M]) extends GraphStage[SinkShape[T]] { | |
val in: Inlet[T] = Inlet("OneToOneOnDemandSink.in") | |
override val shape = SinkShape(in) | |
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { | |
override def preStart(): Unit = pull(in) | |
val awaitingElementHandler = new InHandler { | |
override def onPush(): Unit = { | |
val element = grab(in) | |
val innerSource = createInnerSource(element) | |
val innerSink = sink(element) | |
Source.fromGraph(innerSource.source).runWith(innerSink)(subFusingMaterializer) | |
} | |
override def onUpstreamFinish(): Unit = completeStage() | |
override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex) | |
} | |
setHandler(in, awaitingElementHandler) | |
def createInnerSource(element: T): SubSourceOutlet[T] = { | |
val innerSource = new SubSourceOutlet[T]("OneToOneOnDemandSink.innerSource") | |
innerSource.setHandler(new OutHandler { | |
override def onPull(): Unit = { | |
innerSource.push(element) | |
innerSource.complete() | |
if (isClosed(in)) { | |
completeStage() | |
} else { | |
pull(in) | |
setHandler(in, awaitingElementHandler) | |
} | |
} | |
override def onDownstreamFinish(): Unit = { | |
innerSource.complete() | |
if (isClosed(in)) { | |
completeStage() | |
} | |
} | |
}) | |
setHandler(in, new InHandler { | |
override def onPush(): Unit = { | |
val illegalStateException = new IllegalStateException("Got a push that we weren't expecting") | |
innerSource.fail(illegalStateException) | |
failStage(illegalStateException) | |
} | |
override def onUpstreamFinish(): Unit = { | |
// We don't stop until the inner stream stops. | |
setKeepGoing(true) | |
} | |
override def onUpstreamFailure(ex: Throwable): Unit = { | |
innerSource.fail(ex) | |
failStage(ex) | |
} | |
}) | |
innerSource | |
} | |
} | |
} | |
object OneToOneOnDemandSink { | |
def apply[T, M](sink: T => Sink[T, M]): Sink[T, NotUsed] = Sink.fromGraph(new OneToOneOnDemandSink(sink)) | |
} |
Many thanks for this example @steinybot! 👍
I'm new to Akka Stream, and still getting my head around the various concepts and the API 🤔.
Would you know if it is possible for OneToOneOnDemandSink
to keep track of the materialised value of the inner sink (e.g. accumulate it?) or if there is a fundamental limitation I'm missing?
In my scenario, the inner sink is of type Sink[T, Future[IOResult]]
, and it would be convenient to terminate the program once all futures have completed and/or be able to do something in case of failures. However, GraphStage[SinkShape[T]]
inherently has a NotUsed
mat' value, and specialising OneToOneOnDemandSink
to be GraphStageWithMaterializedValue[SinkShape[T], Future[IOResult]]
isn't trivial -- at least to me.
After quite a bit of wrestling, the below alternative also seems to do the job, though is a lot more specialised than your solution:
def dispatch[T](
dispatcher: T => Path,
serializer: T => ByteString
)(
implicit materializer: Materializer
): Sink[T, Future[Seq[Future[IOResult]]]] =
Sink.fromGraph(
GraphDSL.create(
Sink.seq[Future[IOResult]]
) {
implicit builder =>
sink =>
// prepare this sink's graph elements:
val broadcast = builder.add(Broadcast[T](2))
val serialize = builder.add(Flow[T].map(serializer))
val dispatch = builder.add(Flow[T].map(dispatcher))
val zipAndWrite = builder.add(ZipWith[ByteString, Path, Future[IOResult]](
(bytes, path) => Source.single(bytes).runWith(FileIO.toPath(path)))
)
// connect the graph:
import GraphDSL.Implicits._
broadcast.out(0) ~> serialize ~> zipAndWrite.in0
broadcast.out(1) ~> dispatch ~> zipAndWrite.in1
zipAndWrite.out ~> sink
// expose ports:
SinkShape(broadcast.in)
}
)
The Future[Seq[Future[IOResult]]]
isn't pretty, but I guess can be flattened later on by the caller.
Comparison, comments & feedback welcome, as I'm still not sure about the pros & cons of both solutions.
See https://stackoverflow.com/questions/45192072/use-dynamic-sink-destination-in-akka-streams for more motivation and other solutions to the problem.