Skip to content

Instantly share code, notes, and snippets.

@fchaillou
Created November 8, 2016 20:34
Show Gist options
  • Save fchaillou/fa34d1bd46d9c0f4db7c54fc8f535fe3 to your computer and use it in GitHub Desktop.
Save fchaillou/fa34d1bd46d9c0f4db7c54fc8f535fe3 to your computer and use it in GitHub Desktop.
Issue with callback on materialized value after stream is stopped because of error
package akka.stream.contrib
trait CallbackWrapper[T] extends AsyncCallback[T] {
private trait CallbackState
private case class NotInitialized(list: List[T]) extends CallbackState
private case class Initialized(f: T ⇒ Unit) extends CallbackState
private case class Stopped(f: T ⇒ Unit) extends CallbackState
/*
* To preserve message order when switching between not initialized / initialized states
* lock is used. Case is similar to RepointableActorRef
*/
private[this] final val lock = new ReentrantLock
private[this] val callbackState = new AtomicReference[CallbackState](NotInitialized(Nil))
def stopCallback(f: T ⇒ Unit): Unit = locked {
callbackState.set(Stopped(f))
}
def initCallback(f: T ⇒ Unit): Unit = locked {
val list = (callbackState.getAndSet(Initialized(f)): @unchecked) match {
case NotInitialized(l) ⇒ l
}
list.reverse.foreach(f)
}
override def invoke(arg: T): Unit = locked {
callbackState.get() match {
case Initialized(cb) ⇒ cb(arg)
case list @ NotInitialized(l) ⇒ callbackState.compareAndSet(list, NotInitialized(arg :: l))
case Stopped(cb) ⇒ cb(arg)
}
}
private[this] def locked(body: ⇒ Unit): Unit = {
lock.lock()
try body finally lock.unlock()
}
}
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.contrib
import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, InHandler, OutHandler }
import akka.stream._
import akka.stream.contrib.SwitchMode.{ Close, Open }
import scala.concurrent.{ Future, Promise }
/**
* Pause/ Resume a Flow
*/
sealed trait ValveSwitch {
/**
* Change the state of the valve
*
* @param mode expected mode to switch on
* @return A future that completes with true if the mode did change and false if it already was in the requested mode
*/
def flip(mode: SwitchMode): Future[Boolean]
}
object Valve {
/**
* Factory for [[Valve]] instances.
*
*/
def apply[A](): Valve[A] = Valve[A](SwitchMode.Open)
/**
* Java API: Factory for [[Valve]] instances.
*
*/
def create[A](): Valve[A] = Valve[A](SwitchMode.Open)
/**
* Factory for [[Valve]] instances.
*
*/
def apply[A](mode: SwitchMode): Valve[A] = new Valve[A](mode)
/**
* Java API: Factory for [[Valve]] instances.
*
*/
def create[A](mode: SwitchMode): Valve[A] = Valve[A](mode)
}
/**
* Materializes into a [[ValveSwitch]] which provides a the method flip that stops or restarts the flow of elements passing through the stage. As long as the valve is closed it will backpressure.
*
* Note that closing the valve could result in one element being buffered inside the stage, and if the stream completes or fails while being closed, that element may be lost.
*
* @param mode state of the valve at the startup of the flow (by default Open)
*/
final class Valve[A](mode: SwitchMode) extends GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {
val in: Inlet[A] = Inlet[A]("valve.in")
val out: Outlet[A] = Outlet[A]("valve.out")
override val shape = FlowShape(in, out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, ValveSwitch) = {
val logic = new ValveGraphStageLogic(shape, mode)
(logic, logic.switch)
}
private class ValveGraphStageLogic(shape: Shape, var mode: SwitchMode) extends GraphStageLogic(shape) with InHandler with OutHandler
with CallbackWrapper[(SwitchMode, Promise[Boolean])] {
val callback = getAsyncCallback[(SwitchMode, Promise[Boolean])] {
case (flipToMode, promise) =>
val succeed = mode match {
case _ if flipToMode == mode => false
case Open =>
mode = SwitchMode.Close
true
case Close =>
if (isAvailable(in)) {
push(out, grab(in))
} else if (isAvailable(out)) {
pull(in)
}
mode = SwitchMode.Open
true
}
promise.success(succeed)
}
val switch = new ValveSwitch {
override def flip(flipToMode: SwitchMode): Future[Boolean] = {
val promise = Promise[Boolean]()
invoke((flipToMode, promise))
promise.future
}
}
setHandlers(in, out, this)
@scala.throws[Exception](classOf[Exception])
override def preStart(): Unit = {
super.preStart()
initCallback{
case (newMode, promise) => callback.invoke(newMode, promise)
}
}
@scala.throws[Exception](classOf[Exception])
override def postStop(): Unit = {
lastPromise.get().foreach(_.trySuccess(false))
stopCallback {
case (_, promise) => promise.success(false)
}
}
override def onPush(): Unit = {
if (isOpen) {
push(out, grab(in))
}
}
override def onPull(): Unit = {
if (isOpen) {
pull(in)
}
}
private def isOpen = mode == SwitchMode.Open
}
}
trait SwitchMode
object SwitchMode {
case object Open extends SwitchMode
case object Close extends SwitchMode
}
package akka.stream.contrib
import akka.actor.ActorSystem
import akka.pattern.after
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.testkit.scaladsl._
import akka.stream.contrib.SwitchMode.{ Close, Open }
import org.scalatest._
import org.scalatest.Matchers._
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.Future
import scala.concurrent.duration._
class ValveSpec extends WordSpec with ScalaFutures {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val executionContext = materializer.executionContext
"a valve on an error'd graph" should {
"return false when trying to flip it" in {
val ((sourceProbe, switch), sinkProbe) = TestSource.probe[Int]
.viaMat(Valve(Open))(Keep.both)
.toMat(TestSink.probe[Int])(Keep.both)
.run()
val error = new RuntimeException("Boom !")
sourceProbe.sendError(error)
sinkProbe.request(1)
.expectError(error)
whenReady(switch.flip(Close)) {
_ shouldBe false
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment