Skip to content

Instantly share code, notes, and snippets.

@Vigrond
Last active February 12, 2024 05:42
Show Gist options
  • Save Vigrond/2bbea9be6413415e5479998e79a1b11a to your computer and use it in GitHub Desktop.
Save Vigrond/2bbea9be6413415e5479998e79a1b11a to your computer and use it in GitHub Desktop.
Celery / Django / Redis Rate Limits done "as expected" - Simple SMTP Example
# Rate limiting with Celery + Django + Redis
# Multiple Fixed Windows Algorithm inspired by Figma https://www.figma.com/blog/an-alternative-approach-to-rate-limiting/
# and Celery's sometimes ambiguous, vague, and one-paragraph documentation
#
# Celery's Task is subclassed and the is_rate_okay function is added
# celery.py or however your App is implemented in Django
import os
import math
import time
from celery import Celery, Task
from django_redis import get_redis_connection
from django.conf import settings
from django.utils import timezone
app = Celery('your_app')
# Get Redis connection from our Django 'default' cache setting
redis_conn = get_redis_connection("default")
# We subclass the Celery Task
class YourAppTask(Task):
def is_rate_okay(self, times=30, per=60):
"""
Checks to see if this task is hitting our defined rate limit too much.
This example sets a rate limit of 30/minute.
times (int): The "30" in "30 times per 60 seconds".
per (int): The "60" in "30 times per 60 seconds".
The Redis structure we create is a Hash of timestamp keys with counter values
{
'1560649027.515933': '2', // unlikely to have more than 1
'1560649352.462433': '1',
}
The Redis key is expired after the amount of 'per' has elapsed.
The algorithm totals the counters and checks against 'limit'.
This algorithm currently does not implement the "leniency" described
at the bottom of the figma article referenced at the top of this code.
This is left up to you and depends on application.
Returns True if under the limit, otherwise False.
"""
# Get a timestamp accurate to the microsecond
timestamp = timezone.now().timestamp()
# Set our Redis key to our task name
key = f"rate:{self.name}"
# Create a pipeline to execute redis code atomically
pipe = redis_conn.pipeline()
# Increment our current task hit in the Redis hash
pipe.hincrby(key, timestamp)
# Grab the current expiration of our task key
pipe.ttl(key)
# Grab all of our task hits in our current frame (of 60 seconds)
pipe.hvals(key)
# This returns a list of our command results. [current task hits, expiration, list of all task hits,]
result = pipe.execute()
# If our expiration is not set, set it. This is not part of the atomicity of the pipeline above.
if result[1] < 0:
redis_conn.expire(key, per)
# We must convert byte to int before adding up the counters and comparing to our limit
if sum([int(count) for count in result[2]]) <= times:
return True
else:
return False
app.Task = YourAppTask
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
...
# SMTP Example
import random
from YourApp.celery import app
from django.core.mail import EmailMessage
# We set infinite max_retries so backlogged email tasks do not disappear
@app.task(name='smtp.send-email', max_retries=None, bind=True)
def send_email(self, to_address):
if not self.is_rate_okay():
# We implement a random countdown between 30 and 60 seconds
# so tasks don't come flooding back at the same time
raise self.retry(countdown=random.randint(30, 60))
message = EmailMessage(
'Hello',
'Body goes here',
'[email protected]',
[to_address],
)
message.send()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment