Created
May 11, 2020 03:35
-
-
Save gabfssilva/b3795c3c214b9e4bddd27ff933dfe70f to your computer and use it in GitHub Desktop.
ThrottleHandler
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import akka.actor.{Actor, ActorRef, ActorSystem, Props} | |
import akka.http.scaladsl.model.StatusCodes | |
import akka.http.scaladsl.server.{HttpApp, Route} | |
import scala.concurrent.duration._ | |
import akka.pattern._ | |
import akka.util.Timeout | |
import de.heikoseeberger.akkahttpplayjson.PlayJsonSupport | |
import main.ThrottleMessages._ | |
import play.api.libs.json.{Json, OFormat} | |
import scala.concurrent.Future | |
object Sample extends HttpApp with App with PlayJsonSupport { | |
implicit val timeout: Timeout = Timeout(1.second) | |
val sys: ActorSystem = ActorSystem() | |
val throttle = sys.actorOf(Props(new ThrottleHandler)) | |
override protected def routes: Route = get { | |
pathPrefix("increase") { | |
optionalHeaderValueByName("Remote-Address") { ip => | |
onSuccess((throttle ? SendRequest(ip.getOrElse("localhost"))).mapTo[ThrottleResult]) { | |
case Ok => | |
complete(StatusCodes.OK, "you shall pass") | |
case Reject => | |
complete(StatusCodes.TooManyRequests, "you shall not pass!") | |
} | |
} | |
} ~ pathPrefix("metrics") { | |
complete((throttle ? MetricsRequest).mapTo[List[ThrottleMetrics]]) | |
} | |
} | |
startServer("localhost", 8080, sys) | |
} | |
object ThrottleMessages { | |
case object MetricsRequest | |
case class SendRequest(ip: String) | |
case class Increase(sender: ActorRef) | |
case object Decrease | |
sealed trait ThrottleResult | |
case object Ok extends ThrottleResult | |
case object Reject extends ThrottleResult | |
case class ThrottleMetrics(ip: String, totalRejections: Int, totalAcceptations: Int) | |
object ThrottleMetrics { | |
implicit val format: OFormat[ThrottleMetrics] = Json.format[ThrottleMetrics] | |
} | |
} | |
class ThrottleHandler extends Actor { | |
implicit val timeout: Timeout = Timeout(1.second) | |
import context._ | |
var throttleReferences: Map[String, ActorRef] = Map.empty | |
override def receive: Receive = { | |
case SendRequest(ip) => | |
throttleReferences.get(ip) match { | |
case Some(actor) => | |
actor ! Increase(sender()) | |
case None => | |
// (0 to 1000).foreach { id => | |
// val actor = actorOf(Props(new ThrottleActor(ip))) | |
// throttleReferences = throttleReferences + (s"$ip-$id" -> actor) | |
// } | |
val actor = actorOf(Props(new ThrottleActor(ip))) | |
actor ! Increase(sender()) | |
throttleReferences = throttleReferences + (ip -> actor) | |
} | |
case MetricsRequest => | |
Future | |
.sequence(throttleReferences.map { case (_, actor) => (actor ? MetricsRequest).mapTo[ThrottleMetrics] }) | |
.pipeTo(sender()) | |
} | |
} | |
class ThrottleActor(ip: String) extends Actor { | |
import context._ | |
val max = 3 | |
var counter = 0 | |
var totalRejections = 0 | |
var totalAcceptations = 0 | |
def rejectMode: Receive = { | |
case MetricsRequest => | |
sender() ! ThrottleMetrics(ip, totalRejections, totalAcceptations) | |
case Decrease => | |
counter = counter - 1 | |
if (counter < max) { | |
become(acceptMode) | |
} | |
case Increase(sender) => | |
sender ! Reject | |
totalRejections = totalRejections + 1 | |
} | |
def acceptMode: Receive = { | |
case MetricsRequest => | |
sender() ! ThrottleMetrics(ip, totalRejections, totalAcceptations) | |
case Decrease => | |
counter = counter - 1 | |
case Increase(sender) => | |
counter = counter + 1 | |
sender ! Ok | |
totalAcceptations = totalAcceptations + 1 | |
system.scheduler.scheduleOnce(10.seconds, self, Decrease) | |
if (counter == max) { | |
become(rejectMode) | |
} | |
} | |
override def receive: Receive = acceptMode | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment