Created
April 7, 2017 05:57
-
-
Save joost-de-vries/f1ae7b5e96025e8dfa8bf828f0d767b5 to your computer and use it in GitHub Desktop.
Swave issue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import swave.core.{Coupling, Spout, StreamEnv, UnterminatedSynchronousStreamException} | |
| import scala.concurrent.{Await, Future, TimeoutException} | |
| import scala.concurrent.duration._ | |
| import scala.util.{Success, Try} | |
| trait MyEvent | |
| class EventA extends MyEvent | |
| class EventB extends MyEvent | |
| class EventC extends MyEvent | |
| object SwaveIssue extends App { | |
| implicit val streamEnv = StreamEnv() | |
| private def messages() = | |
| (1 to 10) | |
| .map(_ => "a") | |
| val result1 = versionWithError(messages()) | |
| assert(result1.failed.map(_.getClass) == Success(classOf[UnterminatedSynchronousStreamException])) | |
| val result2 = versionWithTimeout(messages()) | |
| assert(result2.failed.map(_.getClass) == Success(classOf[TimeoutException])) | |
| println("shutting down") // scalastyle:ignore | |
| streamEnv.shutdown() | |
| private def versionWithError(messages: Seq[String]): Try[Unit] = Try { | |
| val (spoutA, spoutB, spoutC) = Fanout.toEventSpouts(Spout.fromIterable(messages)) | |
| val r = spoutA | |
| .attach(spoutB) | |
| .attach(spoutC) | |
| .fanInMerge(eagerComplete = false) | |
| .onElement { e => | |
| println(s"in first version: event $e") // scalastyle:ignore | |
| } | |
| .drainToBlackHole() | |
| Await.result(r, 3.seconds) | |
| } | |
| private def versionWithTimeout(messages: Seq[String]): Try[Unit] = Try { | |
| val (spoutA, spoutB, spoutC) = FanoutNoError.toEventSpouts(Spout.fromIterable(messages)) | |
| val r = spoutA | |
| .attach(spoutB) | |
| .attach(spoutC) | |
| .fanInMerge(eagerComplete = false) | |
| .onElement { e => | |
| println(s"in second version: event $e") // scalastyle:ignore | |
| } | |
| .drainToBlackHole() | |
| Await.result(r, 3.seconds) | |
| } | |
| } | |
| object Fanout { | |
| def toEventSpouts(queueSpout: Spout[String]): (Spout[EventA], Spout[EventB], Spout[EventC]) = { | |
| val couplingA = Coupling[EventA] | |
| val couplingB = Coupling[EventB] | |
| val couplingC = Coupling[EventC] | |
| queueSpout | |
| .flatMap(toBusinessEvent) | |
| .fanOutBroadcast(eagerCancel = true) | |
| .sub | |
| .filter[EventA] | |
| .to(couplingA.in) | |
| .sub | |
| .filter[EventB] | |
| .to(couplingB.in) | |
| .sub | |
| .filter[EventC] | |
| .to(couplingC.in) | |
| (couplingA.out, couplingB.out, couplingC.out) | |
| } | |
| // returrns a spout | |
| def toBusinessEvent(message: String): Spout[MyEvent] = { | |
| message match { | |
| case "a" => Spout.one(new EventA) | |
| case "b" => Spout.one(new EventB) | |
| case "c" => Spout.one(new EventC) | |
| case _ => Spout.failing(new IllegalArgumentException(s"unexpected message $message")) | |
| } | |
| } | |
| } | |
| object FanoutNoError { | |
| def toEventSpouts(queueSpout: Spout[String]): (Spout[EventA], Spout[EventB], Spout[EventC]) = { | |
| val couplingA = Coupling[EventA] | |
| val couplingB = Coupling[EventB] | |
| val couplingC = Coupling[EventC] | |
| queueSpout | |
| .mapAsync(1)(toBusinessEvent) // mapAsync instead of flatMap | |
| .fanOutBroadcast(eagerCancel = true) | |
| .sub | |
| .filter[EventA] | |
| .to(couplingA.in) | |
| .sub | |
| .filter[EventB] | |
| .to(couplingB.in) | |
| .sub | |
| .filter[EventC] | |
| .to(couplingC.in) | |
| (couplingA.out, couplingB.out, couplingC.out) | |
| } | |
| // the difference with the previous version is that here we're returning a future | |
| def toBusinessEvent(message: String): Future[MyEvent] = { | |
| message match { | |
| case "a" => Future.successful(new EventA) | |
| case "b" => Future.successful(new EventB) | |
| case "c" => Future.successful(new EventC) | |
| case _ => Future.failed(new IllegalArgumentException(s"unexpected message $message")) | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment