Created
January 26, 2017 10:22
-
-
Save IgorBerman/191e514d0245dc9ab2c2ccddb3e85cb0 to your computer and use it in GitHub Desktop.
akka streams in java with kill switch, shutdown hook and error handling
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example; | |
import java.util.concurrent.CompletionStage; | |
import scala.concurrent.Await; | |
import scala.concurrent.duration.Duration; | |
import akka.Done; | |
import akka.NotUsed; | |
import akka.actor.ActorSystem; | |
import akka.japi.Pair; | |
import akka.japi.function.Function; | |
import akka.stream.ActorMaterializer; | |
import akka.stream.ActorMaterializerSettings; | |
import akka.stream.FlowShape; | |
import akka.stream.Graph; | |
import akka.stream.KillSwitches; | |
import akka.stream.Materializer; | |
import akka.stream.Supervision; | |
import akka.stream.UniqueKillSwitch; | |
import akka.stream.Supervision.Directive; | |
import akka.stream.javadsl.Flow; | |
import akka.stream.javadsl.Keep; | |
import akka.stream.javadsl.RunnableGraph; | |
import akka.stream.javadsl.Sink; | |
import akka.stream.javadsl.Source; | |
import com.typesafe.config.Config; | |
import com.typesafe.config.ConfigFactory; | |
public class ExceptionHandlingExample { | |
public static void main(String[] args) throws Exception { | |
Config config = ConfigFactory.parseString("akka { loglevel = \"DEBUG\" }").withFallback(ConfigFactory.load("QuickStart")); | |
final ActorSystem system = ActorSystem.create("QuickStart", config); | |
final Function<Throwable, Directive> decider = exc -> { | |
System.out.println(exc); | |
if (exc instanceof ArithmeticException) { | |
return Supervision.resume(); | |
} else if (exc instanceof Exception) { | |
return Supervision.resume(); | |
} | |
return Supervision.stop(); | |
}; | |
final Materializer mat = ActorMaterializer.create(ActorMaterializerSettings.create(system).withDebugLogging(true).withSupervisionStrategy(decider), system); | |
Graph<FlowShape<Integer, Integer>, UniqueKillSwitch> killSwitch = KillSwitches.single(); | |
//take some source | |
Source<Integer,NotUsed> source = Source.range(1, 100); | |
//attach ability to stop it with kill switch | |
Source<Integer, UniqueKillSwitch> sourceWithKillSwitch = source.viaMat(killSwitch, Keep.right()); | |
//this is custom logic that might throw different kinds of errors | |
Flow<Integer,String,NotUsed> map = Flow.<Integer>create().map(x->{ | |
Thread.sleep(1000);//only for testing | |
if (x == 99) { | |
throw new java.lang.Error("Got:"+x);//error | |
} | |
if (x % 7 == 0) { | |
throw new java.lang.ArithmeticException("Got:"+x);//runtime | |
} | |
if (x % 11 == 0) { | |
throw new java.io.IOException("Got:"+x);//checked | |
} | |
return x; | |
}) | |
//map only those that "ok" to string | |
.map(x -> (x * 2)+"\n"); | |
Source<String, UniqueKillSwitch> sourceAfterMapping = sourceWithKillSwitch.via(map); | |
Sink<String, CompletionStage<Done>> consoleSink = Sink.foreach(System.out::print); | |
//create graph | |
RunnableGraph<Pair<UniqueKillSwitch,CompletionStage<Done>>> graph = sourceAfterMapping.toMat(consoleSink, Keep.both()); | |
//and run it when we got materialization hooks for killing and for listening for completion | |
Pair<UniqueKillSwitch, CompletionStage<Done>> switchWithCompletion = graph.run(mat); | |
//"enrich" completion handler with propogated failures(see decider above !ArithmeticException and !Exception) | |
switchWithCompletion.second().exceptionally(new java.util.function.Function<Throwable, Done>() { | |
@Override | |
public Done apply(Throwable t) { | |
System.out.println("Encountered Exception that stops stream, we are terminating everything " + t + " " + t.getCause()); | |
system.terminate(); | |
return null; | |
} | |
}); | |
//shutdown hook for CtrlC or service restarts(i.e. graceful shutdown) | |
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { | |
@Override | |
public void run() { | |
System.out.println("Got Cntrl-C, shutting down by kill switch"); | |
switchWithCompletion.first().shutdown(); | |
} | |
})); | |
//after all setup is done, let's wait for the system to be terminated | |
Await.ready(system.whenTerminated(), Duration.Inf()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment