Skip to content

Instantly share code, notes, and snippets.

@gabfssilva
Created May 11, 2020 03:35
Show Gist options
  • Save gabfssilva/b3795c3c214b9e4bddd27ff933dfe70f to your computer and use it in GitHub Desktop.
Save gabfssilva/b3795c3c214b9e4bddd27ff933dfe70f to your computer and use it in GitHub Desktop.
ThrottleHandler
package main
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.server.{HttpApp, Route}
import scala.concurrent.duration._
import akka.pattern._
import akka.util.Timeout
import de.heikoseeberger.akkahttpplayjson.PlayJsonSupport
import main.ThrottleMessages._
import play.api.libs.json.{Json, OFormat}
import scala.concurrent.Future
object Sample extends HttpApp with App with PlayJsonSupport {
implicit val timeout: Timeout = Timeout(1.second)
val sys: ActorSystem = ActorSystem()
val throttle = sys.actorOf(Props(new ThrottleHandler))
override protected def routes: Route = get {
pathPrefix("increase") {
optionalHeaderValueByName("Remote-Address") { ip =>
onSuccess((throttle ? SendRequest(ip.getOrElse("localhost"))).mapTo[ThrottleResult]) {
case Ok =>
complete(StatusCodes.OK, "you shall pass")
case Reject =>
complete(StatusCodes.TooManyRequests, "you shall not pass!")
}
}
} ~ pathPrefix("metrics") {
complete((throttle ? MetricsRequest).mapTo[List[ThrottleMetrics]])
}
}
startServer("localhost", 8080, sys)
}
object ThrottleMessages {
case object MetricsRequest
case class SendRequest(ip: String)
case class Increase(sender: ActorRef)
case object Decrease
sealed trait ThrottleResult
case object Ok extends ThrottleResult
case object Reject extends ThrottleResult
case class ThrottleMetrics(ip: String, totalRejections: Int, totalAcceptations: Int)
object ThrottleMetrics {
implicit val format: OFormat[ThrottleMetrics] = Json.format[ThrottleMetrics]
}
}
class ThrottleHandler extends Actor {
implicit val timeout: Timeout = Timeout(1.second)
import context._
var throttleReferences: Map[String, ActorRef] = Map.empty
override def receive: Receive = {
case SendRequest(ip) =>
throttleReferences.get(ip) match {
case Some(actor) =>
actor ! Increase(sender())
case None =>
// (0 to 1000).foreach { id =>
// val actor = actorOf(Props(new ThrottleActor(ip)))
// throttleReferences = throttleReferences + (s"$ip-$id" -> actor)
// }
val actor = actorOf(Props(new ThrottleActor(ip)))
actor ! Increase(sender())
throttleReferences = throttleReferences + (ip -> actor)
}
case MetricsRequest =>
Future
.sequence(throttleReferences.map { case (_, actor) => (actor ? MetricsRequest).mapTo[ThrottleMetrics] })
.pipeTo(sender())
}
}
class ThrottleActor(ip: String) extends Actor {
import context._
val max = 3
var counter = 0
var totalRejections = 0
var totalAcceptations = 0
def rejectMode: Receive = {
case MetricsRequest =>
sender() ! ThrottleMetrics(ip, totalRejections, totalAcceptations)
case Decrease =>
counter = counter - 1
if (counter < max) {
become(acceptMode)
}
case Increase(sender) =>
sender ! Reject
totalRejections = totalRejections + 1
}
def acceptMode: Receive = {
case MetricsRequest =>
sender() ! ThrottleMetrics(ip, totalRejections, totalAcceptations)
case Decrease =>
counter = counter - 1
case Increase(sender) =>
counter = counter + 1
sender ! Ok
totalAcceptations = totalAcceptations + 1
system.scheduler.scheduleOnce(10.seconds, self, Decrease)
if (counter == max) {
become(rejectMode)
}
}
override def receive: Receive = acceptMode
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment