Skip to content

Instantly share code, notes, and snippets.

@ktoso
Last active August 29, 2015 14:27
Show Gist options
  • Save ktoso/363f5bb434ecdc654d27 to your computer and use it in GitHub Desktop.
Save ktoso/363f5bb434ecdc654d27 to your computer and use it in GitHub Desktop.
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.actor._
import akka.stream.Supervision._
import akka.stream.impl._
import akka.stream.impl.fusing.ActorInterpreter
import akka.stream.stage.Stage
import akka.stream.testkit.Utils._
import akka.stream.testkit._
import akka.stream.{ AbruptTerminationException, ActorMaterializer, ActorMaterializerSettings, Attributes }
import akka.testkit.TestEvent.{ Mute, UnMute }
import akka.testkit.{ EventFilter, TestDuration }
import com.typesafe.config.ConfigFactory
import org.reactivestreams.{ Publisher, Subscriber }
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
class ManuelSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) {
import FlowSpec._
"Manuel" must {
import java.util.NoSuchElementException
import akka.stream.scaladsl.{ Flow, FlowGraph, Sink, Source }
import akka.stream.{ ActorMaterializerSettings, ActorMaterializer, Supervision }
"work" in {
val decider: Supervision.Decider = {
case nse: NoSuchElementException ⇒
info("Caught a NoSuchElementException")
Supervision.Resume
case other ⇒
info("Don't take life too seriously, nobody gets out alive anyway")
Supervision.Stop
}
implicit val fm = ActorMaterializer(ActorMaterializerSettings(system).withSupervisionStrategy(decider))
val out = Sink.foreach[Int](i ⇒ info(i.toString))
val f = FlowGraph.closed(out) { implicit b ⇒
o ⇒
import FlowGraph.Implicits._
val in = Source(1 to 5)
val boom = Flow[Int].map { e ⇒
if (e % 2 == 0) throw new NoSuchElementException("oops")
else e
}
in ~> boom ~> o
}
import scala.concurrent.duration._
Await.ready(f.run(), 1.minute)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment