Last active
February 27, 2021 18:51
-
-
Save steinybot/cd9aeac534a45aead54c4cea089e0e40 to your computer and use it in GitHub Desktop.
A custom graph stage that materializes a new sink for every element
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.NotUsed | |
import akka.stream.{Attributes, Inlet, SinkShape} | |
import akka.stream.scaladsl.{Sink, Source} | |
import akka.stream.stage._ | |
class OneToOneOnDemandSink[T, +M](sink: T => Sink[T, M]) extends GraphStage[SinkShape[T]] { | |
val in: Inlet[T] = Inlet("OneToOneOnDemandSink.in") | |
override val shape = SinkShape(in) | |
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { | |
override def preStart(): Unit = pull(in) | |
val awaitingElementHandler = new InHandler { | |
override def onPush(): Unit = { | |
val element = grab(in) | |
val innerSource = createInnerSource(element) | |
val innerSink = sink(element) | |
Source.fromGraph(innerSource.source).runWith(innerSink)(subFusingMaterializer) | |
} | |
override def onUpstreamFinish(): Unit = completeStage() | |
override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex) | |
} | |
setHandler(in, awaitingElementHandler) | |
def createInnerSource(element: T): SubSourceOutlet[T] = { | |
val innerSource = new SubSourceOutlet[T]("OneToOneOnDemandSink.innerSource") | |
innerSource.setHandler(new OutHandler { | |
override def onPull(): Unit = { | |
innerSource.push(element) | |
innerSource.complete() | |
if (isClosed(in)) { | |
completeStage() | |
} else { | |
pull(in) | |
setHandler(in, awaitingElementHandler) | |
} | |
} | |
override def onDownstreamFinish(): Unit = { | |
innerSource.complete() | |
if (isClosed(in)) { | |
completeStage() | |
} | |
} | |
}) | |
setHandler(in, new InHandler { | |
override def onPush(): Unit = { | |
val illegalStateException = new IllegalStateException("Got a push that we weren't expecting") | |
innerSource.fail(illegalStateException) | |
failStage(illegalStateException) | |
} | |
override def onUpstreamFinish(): Unit = { | |
// We don't stop until the inner stream stops. | |
setKeepGoing(true) | |
} | |
override def onUpstreamFailure(ex: Throwable): Unit = { | |
innerSource.fail(ex) | |
failStage(ex) | |
} | |
}) | |
innerSource | |
} | |
} | |
} | |
object OneToOneOnDemandSink { | |
def apply[T, M](sink: T => Sink[T, M]): Sink[T, NotUsed] = Sink.fromGraph(new OneToOneOnDemandSink(sink)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Many thanks for this example @steinybot! 👍
I'm new to Akka Stream, and still getting my head around the various concepts and the API 🤔.
Would you know if it is possible for
OneToOneOnDemandSink
to keep track of the materialised value of the inner sink (e.g. accumulate it?) or if there is a fundamental limitation I'm missing?In my scenario, the inner sink is of type
Sink[T, Future[IOResult]]
, and it would be convenient to terminate the program once all futures have completed and/or be able to do something in case of failures. However,GraphStage[SinkShape[T]]
inherently has aNotUsed
mat' value, and specialisingOneToOneOnDemandSink
to beGraphStageWithMaterializedValue[SinkShape[T], Future[IOResult]]
isn't trivial -- at least to me.After quite a bit of wrestling, the below alternative also seems to do the job, though is a lot more specialised than your solution:
The
Future[Seq[Future[IOResult]]]
isn't pretty, but I guess can be flattened later on by the caller.Comparison, comments & feedback welcome, as I'm still not sure about the pros & cons of both solutions.