Created
August 5, 2017 10:24
-
-
Save khajavi/f0b8b76e4df9f639687ad82a5bf628f9 to your computer and use it in GitHub Desktop.
Three flavours of request-response pattern in Akka
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// More info: http://www.nurkiewicz.com/2014/01/three-flavours-of-request-response.html | |
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Cancellable, Props, Terminated} | |
import akka.event.LoggingReceive | |
import akka.util.Timeout | |
import akka.pattern.pipe | |
import scala.concurrent.duration._ | |
import scala.language.postfixOps | |
object Main2 { | |
def main(args: Array[String]): Unit = { | |
val system = ActorSystem("Hello") | |
val a = system.actorOf(Props[HelloWorld], "helloWorld") | |
system.actorOf(Props(classOf[Terminator], a), "terminator") | |
} | |
class Terminator(ref: ActorRef) extends Actor with ActorLogging { | |
context watch ref | |
def receive = { | |
case Terminated(_) => | |
log.info("{} has terminated, shutting down system", ref.path) | |
context.system.terminate() | |
} | |
} | |
} | |
import akka.actor.{ActorLogging, ActorRef} | |
import akka.pattern.ask | |
import scala.util.{Failure, Success} | |
case class CheckHealth() | |
case class Ping() | |
case class Pong() | |
case class Up() | |
case class Down() | |
class MonitoringActor1 extends Actor with ActorLogging { | |
private val networkActor = context.actorOf(Props[NetworkActor], "network") | |
private var origin: Option[ActorRef] = None | |
def receive = { | |
case CheckHealth => | |
networkActor ! Ping | |
origin = Some(sender) | |
case Pong => | |
origin.foreach(_ ! Up) | |
origin = None | |
} | |
} | |
class MonitoringActor2 extends Actor with ActorLogging { | |
private val networkActor = context.actorOf(Props[NetworkActor], "network") | |
def receive = waitingForCheckHealth | |
private def waitingForCheckHealth: Receive = { | |
case CheckHealth => | |
networkActor ! Ping | |
context become waitingForPong(sender) | |
} | |
private def waitingForPong(origin: ActorRef): Receive = { | |
case Pong => | |
origin ! Up | |
context become waitingForCheckHealth | |
} | |
} | |
class MonitoringActor3 extends Actor with ActorLogging { | |
private val networkActor = context.actorOf(Props[NetworkActor], "network") | |
def receive = waitingForCheckHealth | |
private def waitingForCheckHealth: Receive = { | |
case CheckHealth => | |
networkActor ! Ping | |
implicit val ec = context.dispatcher | |
val timeout = context.system.scheduler.scheduleOnce(2.second, self, Down) | |
context become waitingForPong(sender, timeout) | |
} | |
private def waitingForPong(origin: ActorRef, timeout: Cancellable): Receive = LoggingReceive { | |
case Pong => | |
timeout cancel() | |
origin ! Up | |
context become receive | |
case Down => | |
origin ! Down | |
context become receive | |
} | |
} | |
class MonitoringActor4 extends Actor with ActorLogging { | |
private val networkActor = context.actorOf(Props[NetworkActor], "network") | |
def receive = { | |
case CheckHealth => | |
implicit val timeout: Timeout = 1.second | |
implicit val ec = context.dispatcher | |
val origin = sender | |
networkActor ? Ping andThen { | |
case Success(_) => origin ! Up | |
case Failure(_) => origin ! Down | |
} | |
} | |
} | |
class MonitoringActor5 extends Actor with ActorLogging { | |
private val networkActor = context.actorOf(Props[NetworkActor], "network") | |
def receive = LoggingReceive { | |
case CheckHealth => | |
implicit val ec = context.dispatcher | |
networkActor.ask(Ping)(4.second). | |
map { _ => Up }. | |
recover { case _ => Down }. | |
pipeTo(sender) | |
} | |
} | |
class NetworkActor extends Actor with ActorLogging { | |
override def receive: Receive = { | |
case Ping => | |
Thread.sleep(3000) | |
sender() ! Pong | |
} | |
} | |
object Solution1 extends App { | |
import scala.concurrent.ExecutionContext.Implicits.global | |
implicit val system = ActorSystem() | |
private val monitoringActor = system.actorOf(Props[MonitoringActor1], "network") | |
implicit val timeout = Timeout(10 second) | |
(monitoringActor ? CheckHealth).onComplete { | |
case Success(v) => println(v) | |
case Failure(e) => println(e) | |
} | |
} | |
object Solution2 extends App { | |
import scala.concurrent.ExecutionContext.Implicits.global | |
implicit val system = ActorSystem() | |
private val monitoringActor = system.actorOf(Props[MonitoringActor2], "network") | |
implicit val timeout = Timeout(10 second) | |
(monitoringActor ? CheckHealth).onComplete { | |
case Success(v) => println(v) | |
case Failure(e) => println(e) | |
} | |
} | |
object Solution3 extends App { | |
import scala.concurrent.ExecutionContext.Implicits.global | |
implicit val system = ActorSystem() | |
private val monitoringActor = system.actorOf(Props[MonitoringActor3], "network") | |
implicit val timeout = Timeout(10 second) | |
(monitoringActor ? CheckHealth) onComplete { | |
case Success(v) => println(v) | |
case Failure(e) => println(e) | |
} | |
} | |
object Solution4 extends App { | |
import scala.concurrent.ExecutionContext.Implicits.global | |
implicit val system = ActorSystem() | |
private val monitoringActor = system.actorOf(Props[MonitoringActor4], "network") | |
implicit val timeout = Timeout(10 second) | |
(monitoringActor ? CheckHealth) onComplete { | |
case Success(v) => println(v) | |
case Failure(e) => println(e) | |
} | |
} | |
object Solution5 extends App { | |
import scala.concurrent.ExecutionContext.Implicits.global | |
implicit val system = ActorSystem() | |
private val monitoringActor = system.actorOf(Props[MonitoringActor5], "network") | |
implicit val timeout = Timeout(10 second) | |
(monitoringActor ? CheckHealth) onComplete { | |
case Success(v) => println(v) | |
case Failure(e) => println(e) | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment