Last active
September 12, 2019 10:04
-
-
Save zsedem/7fbab5e62335c907aeafcda3da04de47 to your computer and use it in GitHub Desktop.
An akka http server, which keeps track of opened connections and requests
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.atomic.AtomicInteger | |
import java.util.concurrent.{ArrayBlockingQueue, ThreadPoolExecutor, TimeUnit} | |
import akka.actor.ActorSystem | |
import akka.http.scaladsl.Http | |
import akka.http.scaladsl.model.{HttpRequest, HttpResponse} | |
import akka.http.scaladsl.server.Directives._ | |
import akka.stream.ActorMaterializer | |
import akka.stream.scaladsl.{Keep, Sink} | |
import scala.concurrent.duration.Duration | |
import scala.concurrent.{Await, ExecutionContext, Future} | |
object AkkaConnectionMonitoringServer extends App { | |
val response = args.headOption.getOrElse("No response provided") | |
implicit val system: ActorSystem = ActorSystem("helloAkkaHttpServer") | |
implicit val ec = ExecutionContext.fromExecutor( | |
new ThreadPoolExecutor( | |
100, 100, 0L, | |
TimeUnit.MILLISECONDS, new ArrayBlockingQueue[Runnable](10000))) | |
implicit val materializer: ActorMaterializer = ActorMaterializer() | |
val connections = new AtomicInteger(0) | |
val requests = new AtomicInteger(0) | |
Http().bind("0.0.0.0", port = 8080).runWith(Sink foreach { conn => | |
println("connection opened") | |
connections.incrementAndGet() | |
conn.flow.watchTermination()({ case (_, done) => | |
done.onComplete(_ => { | |
println("connection closed") | |
connections.decrementAndGet() | |
}) | |
done | |
}).joinMat(path("") { | |
handleWith((x: HttpRequest) => { | |
requests.incrementAndGet() | |
x.discardEntityBytes() | |
Future { | |
Thread.sleep(200) | |
requests.decrementAndGet() | |
HttpResponse(entity = response + "\n") | |
} | |
}) | |
})(Keep.right).run() | |
}) | |
println("Waiting for requests") | |
def loop(): Unit = { | |
println(s"State\nconnections -> $connections\nrequests -> $requests") | |
Thread.sleep(5 * 1000) | |
loop() | |
} | |
Future(blocking(loop())) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment