Last active
January 15, 2022 21:20
-
-
Save miohtama/86c86b4caa61a5615e44 to your computer and use it in GitHub Desktop.
Rolling window rate limitation implementation for Pyramid
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Rolling time window counter and rate limit using Redis. | |
Use Redis sorted sets to do a rolling time window counters and limiters. These are useful for preventing denial of service, flood and reputation attack against site elements which trigegr outgoing action (email, SMS). | |
Example how to do a Colander validator which checks that the form has not been submitted too many times within the time period:: | |
import colander as s | |
@c.deferred | |
def throttle_invites_validator(node, kw): | |
"Protect invite functionality from flood attacks." | |
request = kw["request"] | |
limit = int(request.registry.settings.get("trees.invite_limit", 60)) | |
def inner(node, value): | |
# Check we don't have many invites going out | |
if rollingwindow.check(request.registry, "invite_friends", window=3600, limit=limit): | |
# Alert devops through Sentry | |
logger.warn("Excessive invite traffic") | |
# Tell users slow down | |
raise c.Invalid(node, 'Too many outgoing invites at the moment. Please try again later.') | |
return inner | |
Then you construct form:: | |
schema = schemas.InviteFriends(validator=schemas.throttle_invites_validator).bind(request=request) | |
form = deform.Form(schema) | |
You can also exercise this code in tests:: | |
def test_flood_invite(web_server, browser, dbsession, init): | |
"Overload invites and see we get an error message." | |
b = browser | |
with transaction.manager: | |
create_user() | |
# Set flood limit to two attempts | |
init.config.registry.settings["trees.invite_limit"] = "2" | |
# Clear Redis counter for outgoing invitations | |
redis = get_redis(init.config.registry) | |
redis.delete("invite_friends") | |
# Login | |
b.visit(web_server + "/login") | |
b.fill("username", EMAIL) | |
b.fill("password", PASSWORD) | |
b.find_by_name("Log_in").click() | |
def flood(): | |
b.visit("{}/invite-friends".format(web_server)) | |
b.find_by_css("#nav-invite-friends").click() | |
b.fill("phone_number", "555 123 1234") | |
b.find_by_name("invite").click() | |
flood() | |
assert b.is_text_present("Invite SMS sent") | |
flood() | |
assert b.is_text_present("Invite SMS sent") | |
flood() | |
assert b.is_text_present("Too many outgoing invites at the moment") | |
More info | |
* http://opensourcehacker.com/2014/07/09/rolling-time-window-counters-with-redis-and-mitigating-botnet-driven-login-attacks/ | |
* http://redis.io/commands/zadd | |
""" | |
import time | |
from websauna.system.core.redis import get_redis | |
def _check(redis, key, window=60, limit=50): | |
# Expire old keys (hits) | |
expires = time.time() - window | |
redis.zremrangebyscore(key, '-inf', expires) | |
# Add a hit on the very moment | |
now = time.time() | |
redis.zadd(key, now, now) | |
# If we currently have more keys than limit, | |
# then limit the action | |
if redis.zcard(key) > limit: | |
return True | |
return False | |
def _get(redis, key): | |
""" Get the current hits per rolling time window. | |
:param redis: Redis client | |
:param key: Redis key name we use to keep counter | |
:return: int, how many hits we have within the current rolling time window | |
""" | |
return redis.zcard(key) | |
def check(registry, key, window=60, limit=10): | |
"""Do a rolling time window counter hit. | |
Use ``key`` to store the current hit rate in Redis. | |
:param registry: Pyramid registry e.g. request.registry | |
:param key: Redis key name we use to keep counter | |
:param window: Rolling time window in seconds. Default 60 seconds. | |
:param limit: Allowed operations per time window. Default 10 hits. | |
:return: True is the maximum limit has been reached for the current time window | |
""" | |
redis = get_redis(registry) | |
return _check(redis, key, window, limit) | |
def get(registry, key): | |
"""Get the current hits per rolling time window. | |
Use ``key`` to store the current hit rate in Redis. | |
:param registry: Pyramid registry e.g. request.registry | |
:param key: Redis key name we use to keep counter | |
:return: int, how many hits we have within the current rolling time window | |
""" | |
redis = get_redis(registry) | |
return _check(redis, key) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Deform throttling support.""" | |
import logging | |
import colander as c | |
from . import rollingwindow | |
logger = logging.getLogger(__name__) | |
def create_throttle_validator(name:str, max_actions_in_time_window:int, time_window_in_seconds:int=3600): | |
"""Creates a Colander form validator which prevents form submissions exceed certain rate. | |
Form submissions are throttled system wide. This prevents abuse of the system by flooding it with requests. | |
A logging warning is issued if the rate is exceeded. The user is greeted with an error message telling the submission is not possible at the moment. | |
Example:: | |
from tomb_routes import simple_route | |
from websauna.system.form.throttle import create_throttle_validator | |
from myapp import schemas | |
@simple_route("/login", route_name="login", renderer="login/login.html", append_slash=False) | |
def login(request): | |
# Read allowed email login rate from the config file | |
email_login_rate = int(request.registry.settings.get("trees.email_login_rate", 50)) | |
# Create a Colander schema instance with rate limit validator | |
email_schema = schemas.LoginWithEmail(validator=create_throttle_validator("email_login", email_login_rate)).bind(request=request) | |
:param name: Identify this throttler. Used as a Redis key. | |
:param max_actions_in_time_window: Number of allowed actions per window | |
:param time_window_in_seconds: Time in window in seconds. Default one hour, 3600 seconds. | |
:return: Function to be passed to ``validator`` Colander schema construction parameter. | |
""" | |
@c.deferred | |
def throttle_validator(node, kw): | |
"""Protect invite functionality from flood attacks.""" | |
request = kw["request"] | |
limit = max_actions_in_time_window | |
def inner(node, value): | |
# Check we don't have many invites going out | |
if rollingwindow.check(request.registry, "throttle_" + name, window=time_window_in_seconds, limit=limit): | |
# Alert devops through Sentry | |
logger.warn("Excessive form submissions on %s", name) | |
# Tell users slow down | |
raise c.Invalid(node, 'Too many form submissions at the moment. Please try again later.') | |
return inner | |
return throttle_validator |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment