Created
September 22, 2016 02:59
-
-
Save tkawachi/db13b3aad80712d4a034a4756ab4786e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package foo | |
import akka.stream.ActorAttributes.SupervisionStrategy | |
import akka.stream.Attributes.name | |
import akka.stream.impl.Stages.DefaultAttributes.IODispatcher | |
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler} | |
import akka.stream.{Attributes, Inlet, SinkShape, Supervision} | |
import scala.annotation.tailrec | |
import scala.util.control.NonFatal | |
class ForeachResourceSink[T, S]( | |
create: () ⇒ S, | |
writeData: (S, T) ⇒ Unit, | |
close: (S) ⇒ Unit | |
) extends GraphStage[SinkShape[T]] { | |
val in = Inlet[T]("ForeachResourceSink.in") | |
override def shape: SinkShape[T] = SinkShape(in) | |
override def initialAttributes: Attributes = name("foreachResourceSink") and IODispatcher | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = | |
new GraphStageLogic(shape) with InHandler { | |
lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) | |
var blockingStream: S = _ | |
setHandler(in, this) | |
override def preStart(): Unit = blockingStream = create() | |
override def onPush(): Unit = { | |
@tailrec | |
def loop(elem: T): Unit = { | |
try { | |
writeData(blockingStream, elem) | |
} catch { | |
case NonFatal(ex) => decider(ex) match { | |
case Supervision.Stop => | |
close(blockingStream) | |
failStage(ex) | |
case Supervision.Restart => | |
restartState() | |
loop(elem) | |
case Supervision.Resume => | |
loop(elem) | |
} | |
} | |
} | |
loop(grab(in)) | |
} | |
override def onUpstreamFinish() = closeStage() | |
private def restartState(): Unit = { | |
close(blockingStream) | |
blockingStream = create() | |
} | |
private def closeStage(): Unit = | |
try { | |
close(blockingStream) | |
completeStage() | |
} catch { | |
case NonFatal(ex) ⇒ failStage(ex) | |
} | |
override def toString: String = "ForeachResourceSink" | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment