Skip to content

Instantly share code, notes, and snippets.

@tkawachi
Created September 22, 2016 02:59
Show Gist options
  • Save tkawachi/db13b3aad80712d4a034a4756ab4786e to your computer and use it in GitHub Desktop.
Save tkawachi/db13b3aad80712d4a034a4756ab4786e to your computer and use it in GitHub Desktop.
package foo
import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream.Attributes.name
import akka.stream.impl.Stages.DefaultAttributes.IODispatcher
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler}
import akka.stream.{Attributes, Inlet, SinkShape, Supervision}
import scala.annotation.tailrec
import scala.util.control.NonFatal
class ForeachResourceSink[T, S](
create: () ⇒ S,
writeData: (S, T) ⇒ Unit,
close: (S) ⇒ Unit
) extends GraphStage[SinkShape[T]] {
val in = Inlet[T]("ForeachResourceSink.in")
override def shape: SinkShape[T] = SinkShape(in)
override def initialAttributes: Attributes = name("foreachResourceSink") and IODispatcher
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler {
lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
var blockingStream: S = _
setHandler(in, this)
override def preStart(): Unit = blockingStream = create()
override def onPush(): Unit = {
@tailrec
def loop(elem: T): Unit = {
try {
writeData(blockingStream, elem)
} catch {
case NonFatal(ex) => decider(ex) match {
case Supervision.Stop =>
close(blockingStream)
failStage(ex)
case Supervision.Restart =>
restartState()
loop(elem)
case Supervision.Resume =>
loop(elem)
}
}
}
loop(grab(in))
}
override def onUpstreamFinish() = closeStage()
private def restartState(): Unit = {
close(blockingStream)
blockingStream = create()
}
private def closeStage(): Unit =
try {
close(blockingStream)
completeStage()
} catch {
case NonFatal(ex) ⇒ failStage(ex)
}
override def toString: String = "ForeachResourceSink"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment