Created
November 8, 2016 20:34
-
-
Save fchaillou/fa34d1bd46d9c0f4db7c54fc8f535fe3 to your computer and use it in GitHub Desktop.
Issue with callback on materialized value after stream is stopped because of error
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akka.stream.contrib | |
trait CallbackWrapper[T] extends AsyncCallback[T] { | |
private trait CallbackState | |
private case class NotInitialized(list: List[T]) extends CallbackState | |
private case class Initialized(f: T ⇒ Unit) extends CallbackState | |
private case class Stopped(f: T ⇒ Unit) extends CallbackState | |
/* | |
* To preserve message order when switching between not initialized / initialized states | |
* lock is used. Case is similar to RepointableActorRef | |
*/ | |
private[this] final val lock = new ReentrantLock | |
private[this] val callbackState = new AtomicReference[CallbackState](NotInitialized(Nil)) | |
def stopCallback(f: T ⇒ Unit): Unit = locked { | |
callbackState.set(Stopped(f)) | |
} | |
def initCallback(f: T ⇒ Unit): Unit = locked { | |
val list = (callbackState.getAndSet(Initialized(f)): @unchecked) match { | |
case NotInitialized(l) ⇒ l | |
} | |
list.reverse.foreach(f) | |
} | |
override def invoke(arg: T): Unit = locked { | |
callbackState.get() match { | |
case Initialized(cb) ⇒ cb(arg) | |
case list @ NotInitialized(l) ⇒ callbackState.compareAndSet(list, NotInitialized(arg :: l)) | |
case Stopped(cb) ⇒ cb(arg) | |
} | |
} | |
private[this] def locked(body: ⇒ Unit): Unit = { | |
lock.lock() | |
try body finally lock.unlock() | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com> | |
*/ | |
package akka.stream.contrib | |
import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, InHandler, OutHandler } | |
import akka.stream._ | |
import akka.stream.contrib.SwitchMode.{ Close, Open } | |
import scala.concurrent.{ Future, Promise } | |
/** | |
* Pause/ Resume a Flow | |
*/ | |
sealed trait ValveSwitch { | |
/** | |
* Change the state of the valve | |
* | |
* @param mode expected mode to switch on | |
* @return A future that completes with true if the mode did change and false if it already was in the requested mode | |
*/ | |
def flip(mode: SwitchMode): Future[Boolean] | |
} | |
object Valve { | |
/** | |
* Factory for [[Valve]] instances. | |
* | |
*/ | |
def apply[A](): Valve[A] = Valve[A](SwitchMode.Open) | |
/** | |
* Java API: Factory for [[Valve]] instances. | |
* | |
*/ | |
def create[A](): Valve[A] = Valve[A](SwitchMode.Open) | |
/** | |
* Factory for [[Valve]] instances. | |
* | |
*/ | |
def apply[A](mode: SwitchMode): Valve[A] = new Valve[A](mode) | |
/** | |
* Java API: Factory for [[Valve]] instances. | |
* | |
*/ | |
def create[A](mode: SwitchMode): Valve[A] = Valve[A](mode) | |
} | |
/** | |
* Materializes into a [[ValveSwitch]] which provides a the method flip that stops or restarts the flow of elements passing through the stage. As long as the valve is closed it will backpressure. | |
* | |
* Note that closing the valve could result in one element being buffered inside the stage, and if the stream completes or fails while being closed, that element may be lost. | |
* | |
* @param mode state of the valve at the startup of the flow (by default Open) | |
*/ | |
final class Valve[A](mode: SwitchMode) extends GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] { | |
val in: Inlet[A] = Inlet[A]("valve.in") | |
val out: Outlet[A] = Outlet[A]("valve.out") | |
override val shape = FlowShape(in, out) | |
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, ValveSwitch) = { | |
val logic = new ValveGraphStageLogic(shape, mode) | |
(logic, logic.switch) | |
} | |
private class ValveGraphStageLogic(shape: Shape, var mode: SwitchMode) extends GraphStageLogic(shape) with InHandler with OutHandler | |
with CallbackWrapper[(SwitchMode, Promise[Boolean])] { | |
val callback = getAsyncCallback[(SwitchMode, Promise[Boolean])] { | |
case (flipToMode, promise) => | |
val succeed = mode match { | |
case _ if flipToMode == mode => false | |
case Open => | |
mode = SwitchMode.Close | |
true | |
case Close => | |
if (isAvailable(in)) { | |
push(out, grab(in)) | |
} else if (isAvailable(out)) { | |
pull(in) | |
} | |
mode = SwitchMode.Open | |
true | |
} | |
promise.success(succeed) | |
} | |
val switch = new ValveSwitch { | |
override def flip(flipToMode: SwitchMode): Future[Boolean] = { | |
val promise = Promise[Boolean]() | |
invoke((flipToMode, promise)) | |
promise.future | |
} | |
} | |
setHandlers(in, out, this) | |
@scala.throws[Exception](classOf[Exception]) | |
override def preStart(): Unit = { | |
super.preStart() | |
initCallback{ | |
case (newMode, promise) => callback.invoke(newMode, promise) | |
} | |
} | |
@scala.throws[Exception](classOf[Exception]) | |
override def postStop(): Unit = { | |
lastPromise.get().foreach(_.trySuccess(false)) | |
stopCallback { | |
case (_, promise) => promise.success(false) | |
} | |
} | |
override def onPush(): Unit = { | |
if (isOpen) { | |
push(out, grab(in)) | |
} | |
} | |
override def onPull(): Unit = { | |
if (isOpen) { | |
pull(in) | |
} | |
} | |
private def isOpen = mode == SwitchMode.Open | |
} | |
} | |
trait SwitchMode | |
object SwitchMode { | |
case object Open extends SwitchMode | |
case object Close extends SwitchMode | |
} | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akka.stream.contrib | |
import akka.actor.ActorSystem | |
import akka.pattern.after | |
import akka.stream.ActorMaterializer | |
import akka.stream.scaladsl.{Keep, Sink, Source} | |
import akka.stream.testkit.scaladsl._ | |
import akka.stream.contrib.SwitchMode.{ Close, Open } | |
import org.scalatest._ | |
import org.scalatest.Matchers._ | |
import org.scalatest.concurrent.ScalaFutures | |
import scala.concurrent.Future | |
import scala.concurrent.duration._ | |
class ValveSpec extends WordSpec with ScalaFutures { | |
implicit val system = ActorSystem() | |
implicit val materializer = ActorMaterializer() | |
implicit val executionContext = materializer.executionContext | |
"a valve on an error'd graph" should { | |
"return false when trying to flip it" in { | |
val ((sourceProbe, switch), sinkProbe) = TestSource.probe[Int] | |
.viaMat(Valve(Open))(Keep.both) | |
.toMat(TestSink.probe[Int])(Keep.both) | |
.run() | |
val error = new RuntimeException("Boom !") | |
sourceProbe.sendError(error) | |
sinkProbe.request(1) | |
.expectError(error) | |
whenReady(switch.flip(Close)) { | |
_ shouldBe false | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment