Skip to content

Instantly share code, notes, and snippets.

@longshorej
Last active May 31, 2016 16:04
Show Gist options
  • Save longshorej/5423bfe63018c2b8d6a1 to your computer and use it in GitHub Desktop.
Save longshorej/5423bfe63018c2b8d6a1 to your computer and use it in GitHub Desktop.
akka http statistics
package com.hpn.libs.akka.http
// no time to properly open source, but maybe this someone finds this useful. It's used
// as part of our graceful shutdown logic for akka HTTP
import akka.http.scaladsl.model._
import akka.stream._
import akka.stream.stage._
import java.util.concurrent.atomic.AtomicLong
class Statistics {
private val connectionsOpened = new AtomicLong
private val connectionsClosed = new AtomicLong
def clear(): Unit = {
connectionsOpened.set(0L)
connectionsClosed.set(0L)
}
def get = {
Stats(connectionsOpened.get, connectionsOpened.get - connectionsClosed.get)
}
def countRequests(): GraphStage[FlowShape[HttpRequest, HttpRequest]] = {
new GraphStage[FlowShape[HttpRequest, HttpRequest]] {
val in = Inlet[HttpRequest]("request.in")
val out = Outlet[HttpRequest]("request.out")
override def shape: FlowShape[HttpRequest, HttpRequest] = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape)
with InHandler
with OutHandler {
override def preStart(): Unit = {
connectionsOpened.incrementAndGet()
()
}
override def postStop(): Unit = {
connectionsClosed.incrementAndGet()
()
}
override def onPull(): Unit = {
pull(in)
}
override def onPush(): Unit = {
push(out, grab(in))
}
override def onUpstreamFinish(): Unit = {
()
}
setHandlers(in, out, this)
}
}
}
}
case class Stats(totalConnections: Long, openConnections: Long)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment