Created
May 27, 2014 13:52
-
-
Save rocketraman/a7a3f686af887251af6a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package mypackage.web | |
import akka.actor._ | |
import akka.io.IO | |
import akka.pattern.{AskTimeoutException, ask, gracefulStop, pipe} | |
import com.typesafe.scalalogging.slf4j.Logger | |
import javax.inject.Inject | |
import org.slf4j.LoggerFactory | |
import scala.concurrent.duration._ | |
import scala.concurrent.{Await, Future} | |
import spray.can.Http | |
import spray.can.server.Stats | |
import RouteActorV1 | |
object WebServer { | |
object StartMsg | |
object StopMsg | |
object ListenerStartupFailureMsg | |
val actorName = "server-manager" | |
val actorPath = "/user/%s".format(actorName) | |
class ServerManagerActor extends Actor { | |
private final val log = Logger(LoggerFactory.getLogger(classOf[ServerManagerActor])) | |
val routeService = context.actorOf(Props[RouteActorV1], "route-service") | |
var httpListener: ActorRef = _ | |
var listenerStartupTimer: Cancellable = _ | |
implicit def executionContext = context.dispatcher | |
def receive = { | |
case StartMsg => | |
// send status back to us | |
IO(Http)(context.system).tell(Http.Bind(routeService, "0.0.0.0", port = 8080), self) | |
// if Spray listener doesn't start for some reason (e.g. an error in the config we don't get any message) | |
// if we don't get a reference to the listener in 5s, log an error and shutdown | |
listenerStartupTimer = context.system.scheduler.scheduleOnce(5.seconds, self, ListenerStartupFailureMsg) | |
case b: Http.Bound => | |
listenerStartupTimer.cancel() | |
listenerStartupTimer = null | |
httpListener = sender() | |
context.watch(httpListener) | |
log.info("Bound to port successfully. Ready to receive requests.") | |
case t: Http.CommandFailed => | |
// TODO: replace by actual exception when Akka #3861 is fixed. | |
// see https://www.assembla.com/spaces/akka/tickets/3861 | |
log.error("Port binding failed. Switch on DEBUG-level logging for `akka.io.TcpListener` to log the cause.") | |
self ! StopMsg | |
case StopMsg => | |
log.info("Closing all connections to HTTP listener.") | |
IO(Http)(context.system) ! Http.CloseAll | |
case Http.ClosedAll => | |
// it seems the CloseAll also unbinds the port, which is a little weird | |
//httpListener ! Http.Unbind | |
self ! Http.Unbound | |
case Http.Unbound => | |
log.info("All connections closed, stopping listener manager.") | |
context.stop(self) | |
case Http.GetStats => | |
httpListener.ask(Http.GetStats)(1.second).mapTo[Stats].pipeTo(sender()) | |
case Http.ClearStats => | |
httpListener ! Http.ClearStats | |
case ListenerStartupFailureMsg => | |
log.warn("HTTP listener did not startup? Shutting down.") | |
context.stop(self) | |
case Terminated(a) if a.equals(httpListener) => | |
log.warn("HTTP listener was terminated. Shutting down.") | |
context.stop(self) | |
case m => log.info(s"Unhandled message: $m") | |
} | |
} | |
} | |
class WebServer (val system: ActorSystem) extends Service { | |
import WebServer._ | |
private final val log = Logger(LoggerFactory.getLogger(classOf[WebServer])) | |
val serverManager = system.actorOf(Props[ServerManagerActor], actorName) | |
override def start() = { | |
serverManager ! StartMsg | |
} | |
override def stop() = { | |
val stoppedManager: Future[Boolean] = gracefulStop(serverManager, 5.seconds, StopMsg) | |
try { | |
Await.result(stoppedManager, 6.seconds) | |
} catch { | |
case e: AskTimeoutException => | |
log.warn("Timeout waiting for Server Manager to shutdown.") | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment