Skip to content

Instantly share code, notes, and snippets.

@rocketraman
Created May 27, 2014 13:52
Show Gist options
  • Save rocketraman/a7a3f686af887251af6a to your computer and use it in GitHub Desktop.
Save rocketraman/a7a3f686af887251af6a to your computer and use it in GitHub Desktop.
package mypackage.web
import akka.actor._
import akka.io.IO
import akka.pattern.{AskTimeoutException, ask, gracefulStop, pipe}
import com.typesafe.scalalogging.slf4j.Logger
import javax.inject.Inject
import org.slf4j.LoggerFactory
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import spray.can.Http
import spray.can.server.Stats
import RouteActorV1
object WebServer {
object StartMsg
object StopMsg
object ListenerStartupFailureMsg
val actorName = "server-manager"
val actorPath = "/user/%s".format(actorName)
class ServerManagerActor extends Actor {
private final val log = Logger(LoggerFactory.getLogger(classOf[ServerManagerActor]))
val routeService = context.actorOf(Props[RouteActorV1], "route-service")
var httpListener: ActorRef = _
var listenerStartupTimer: Cancellable = _
implicit def executionContext = context.dispatcher
def receive = {
case StartMsg =>
// send status back to us
IO(Http)(context.system).tell(Http.Bind(routeService, "0.0.0.0", port = 8080), self)
// if Spray listener doesn't start for some reason (e.g. an error in the config we don't get any message)
// if we don't get a reference to the listener in 5s, log an error and shutdown
listenerStartupTimer = context.system.scheduler.scheduleOnce(5.seconds, self, ListenerStartupFailureMsg)
case b: Http.Bound =>
listenerStartupTimer.cancel()
listenerStartupTimer = null
httpListener = sender()
context.watch(httpListener)
log.info("Bound to port successfully. Ready to receive requests.")
case t: Http.CommandFailed =>
// TODO: replace by actual exception when Akka #3861 is fixed.
// see https://www.assembla.com/spaces/akka/tickets/3861
log.error("Port binding failed. Switch on DEBUG-level logging for `akka.io.TcpListener` to log the cause.")
self ! StopMsg
case StopMsg =>
log.info("Closing all connections to HTTP listener.")
IO(Http)(context.system) ! Http.CloseAll
case Http.ClosedAll =>
// it seems the CloseAll also unbinds the port, which is a little weird
//httpListener ! Http.Unbind
self ! Http.Unbound
case Http.Unbound =>
log.info("All connections closed, stopping listener manager.")
context.stop(self)
case Http.GetStats =>
httpListener.ask(Http.GetStats)(1.second).mapTo[Stats].pipeTo(sender())
case Http.ClearStats =>
httpListener ! Http.ClearStats
case ListenerStartupFailureMsg =>
log.warn("HTTP listener did not startup? Shutting down.")
context.stop(self)
case Terminated(a) if a.equals(httpListener) =>
log.warn("HTTP listener was terminated. Shutting down.")
context.stop(self)
case m => log.info(s"Unhandled message: $m")
}
}
}
class WebServer (val system: ActorSystem) extends Service {
import WebServer._
private final val log = Logger(LoggerFactory.getLogger(classOf[WebServer]))
val serverManager = system.actorOf(Props[ServerManagerActor], actorName)
override def start() = {
serverManager ! StartMsg
}
override def stop() = {
val stoppedManager: Future[Boolean] = gracefulStop(serverManager, 5.seconds, StopMsg)
try {
Await.result(stoppedManager, 6.seconds)
} catch {
case e: AskTimeoutException =>
log.warn("Timeout waiting for Server Manager to shutdown.")
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment