Skip to content

Instantly share code, notes, and snippets.

@joost-de-vries
Created April 7, 2017 05:57
Show Gist options
  • Select an option

  • Save joost-de-vries/f1ae7b5e96025e8dfa8bf828f0d767b5 to your computer and use it in GitHub Desktop.

Select an option

Save joost-de-vries/f1ae7b5e96025e8dfa8bf828f0d767b5 to your computer and use it in GitHub Desktop.
Swave issue
import swave.core.{Coupling, Spout, StreamEnv, UnterminatedSynchronousStreamException}
import scala.concurrent.{Await, Future, TimeoutException}
import scala.concurrent.duration._
import scala.util.{Success, Try}
trait MyEvent
class EventA extends MyEvent
class EventB extends MyEvent
class EventC extends MyEvent
object SwaveIssue extends App {
implicit val streamEnv = StreamEnv()
private def messages() =
(1 to 10)
.map(_ => "a")
val result1 = versionWithError(messages())
assert(result1.failed.map(_.getClass) == Success(classOf[UnterminatedSynchronousStreamException]))
val result2 = versionWithTimeout(messages())
assert(result2.failed.map(_.getClass) == Success(classOf[TimeoutException]))
println("shutting down") // scalastyle:ignore
streamEnv.shutdown()
private def versionWithError(messages: Seq[String]): Try[Unit] = Try {
val (spoutA, spoutB, spoutC) = Fanout.toEventSpouts(Spout.fromIterable(messages))
val r = spoutA
.attach(spoutB)
.attach(spoutC)
.fanInMerge(eagerComplete = false)
.onElement { e =>
println(s"in first version: event $e") // scalastyle:ignore
}
.drainToBlackHole()
Await.result(r, 3.seconds)
}
private def versionWithTimeout(messages: Seq[String]): Try[Unit] = Try {
val (spoutA, spoutB, spoutC) = FanoutNoError.toEventSpouts(Spout.fromIterable(messages))
val r = spoutA
.attach(spoutB)
.attach(spoutC)
.fanInMerge(eagerComplete = false)
.onElement { e =>
println(s"in second version: event $e") // scalastyle:ignore
}
.drainToBlackHole()
Await.result(r, 3.seconds)
}
}
object Fanout {
def toEventSpouts(queueSpout: Spout[String]): (Spout[EventA], Spout[EventB], Spout[EventC]) = {
val couplingA = Coupling[EventA]
val couplingB = Coupling[EventB]
val couplingC = Coupling[EventC]
queueSpout
.flatMap(toBusinessEvent)
.fanOutBroadcast(eagerCancel = true)
.sub
.filter[EventA]
.to(couplingA.in)
.sub
.filter[EventB]
.to(couplingB.in)
.sub
.filter[EventC]
.to(couplingC.in)
(couplingA.out, couplingB.out, couplingC.out)
}
// returrns a spout
def toBusinessEvent(message: String): Spout[MyEvent] = {
message match {
case "a" => Spout.one(new EventA)
case "b" => Spout.one(new EventB)
case "c" => Spout.one(new EventC)
case _ => Spout.failing(new IllegalArgumentException(s"unexpected message $message"))
}
}
}
object FanoutNoError {
def toEventSpouts(queueSpout: Spout[String]): (Spout[EventA], Spout[EventB], Spout[EventC]) = {
val couplingA = Coupling[EventA]
val couplingB = Coupling[EventB]
val couplingC = Coupling[EventC]
queueSpout
.mapAsync(1)(toBusinessEvent) // mapAsync instead of flatMap
.fanOutBroadcast(eagerCancel = true)
.sub
.filter[EventA]
.to(couplingA.in)
.sub
.filter[EventB]
.to(couplingB.in)
.sub
.filter[EventC]
.to(couplingC.in)
(couplingA.out, couplingB.out, couplingC.out)
}
// the difference with the previous version is that here we're returning a future
def toBusinessEvent(message: String): Future[MyEvent] = {
message match {
case "a" => Future.successful(new EventA)
case "b" => Future.successful(new EventB)
case "c" => Future.successful(new EventC)
case _ => Future.failed(new IllegalArgumentException(s"unexpected message $message"))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment