Skip to content

Instantly share code, notes, and snippets.

@hallazzang
Last active August 8, 2022 09:19
Show Gist options
  • Save hallazzang/d828e36ea6cf7ff476d033da0f15dec5 to your computer and use it in GitHub Desktop.
Save hallazzang/d828e36ea6cf7ff476d033da0f15dec5 to your computer and use it in GitHub Desktop.
Example of how to implement throttling in Asyncio/Aiohttp

Example of how to implement throttling in Asyncio/Aiohttp

This gist consists of 4 files:

throttler.py - Throttler class implementation
asyncio_throttling_example.py - Throttling general coroutines
aiohttp_throttling_example_client.py - Throttling aiohttp requests
aiohttp_throttling_example_server.py - Web server for testing aiohttp_throttling_example_client.py

The main Throttler class's implementation is easy to understand, so take a look at it. It (may)supports Python >=3.5.

Tutorial

Creating Throttler instance

You can create Throttler instance with proper rate limit:

from throttler import Throttler

throttler = Throttler(50)  # Limit rate to 50 tasks/sec

with statement with multiple context expressions

You can use multiple context expressions in with statement(Of course, we'll use async with in this case):

async with throttler, session.get('http://httpbin.org') as resp:
    print(await resp.text())
import sys
import signal
import random
import asyncio
from collections import deque
import aiohttp
from throttler import Throttler
request_logs = deque()
async def worker(session, throttler, worker_no):
requested = 0
while True:
async with throttler:
request_logs.append(asyncio.get_event_loop().time())
requested += 1
async with session.post('http://127.0.0.1:8080/api') as resp:
pass
async def main(loop):
rate_limit = 50
num_of_workers = 100
rate_limit_exceeded = False
throttler = Throttler(rate_limit, loop=loop)
with aiohttp.ClientSession() as session:
for worker_no in range(num_of_workers):
loop.create_task(worker(session, throttler, worker_no))
while True:
now = loop.time()
# Same queueing logic with Throttler
while request_logs:
if now - request_logs[0] > 1.0:
request_logs.popleft()
else:
break
if len(request_logs) > rate_limit:
rate_limit_exceeded = True
message = 'Rate: %4d req/s (%s)' % (len(request_logs),
'PASSING' if not rate_limit_exceeded else 'EXCEEDED')
print('\r%-35s' % message, end='')
await asyncio.sleep(0.1)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# Add signal handler to handle Ctrl+C quit gracefully
def signal_handler():
for task in asyncio.Task.all_tasks():
task.cancel()
loop.add_signal_handler(signal.SIGINT, signal_handler)
loop.add_signal_handler(signal.SIGTERM, signal_handler)
try:
loop.run_until_complete(main(loop))
except asyncio.CancelledError:
print('\nCancelled')
finally:
loop.close()
import time
import asyncio
from collections import deque
from aiohttp import web
request_logs = deque()
rate_limit = 50 # Per second
rate_limit_exceeded = False
async def api(request):
request_logs.append(time.time())
return web.Response(
text='API works' if not rate_limit_exceeded else 'API died')
async def rate_monitor():
global rate_limit_exceeded
try:
while True:
now = time.time()
while request_logs:
if now - request_logs[0] > 1.0:
request_logs.popleft()
else:
break
if len(request_logs) > rate_limit:
rate_limit_exceeded = True
message = 'Rate: %4d req/s' % len(request_logs)
print('\r%-25s' % message, end='')
await asyncio.sleep(0.1)
except asyncio.CancelledError:
pass
async def start_rate_monitor(app):
app['rate_monitor'] = app.loop.create_task(rate_monitor())
async def cleanup_rate_monitor(app):
app['rate_monitor'].cancel()
await app['rate_monitor']
app = web.Application()
app.on_startup.append(start_rate_monitor)
app.on_cleanup.append(cleanup_rate_monitor)
app.router.add_post('/api', api)
web.run_app(app, host='127.0.0.1', port=8080)
import sys
import signal
import random
import asyncio
from collections import deque
from throttler import Throttler
request_logs = deque()
async def worker(throttler, worker_no):
requested = 0
while True:
async with throttler:
# print('Worker #%d - fetching...' % worker_no)
request_logs.append(asyncio.get_event_loop().time())
requested += 1
await asyncio.sleep(0.5 + random.random())
# print('Worker #%d - fetched! (total %d)' % (worker_no, requested))
async def main(loop):
rate_limit = 25
num_of_workers = 30
rate_limit_exceeded = False
throttler = Throttler(rate_limit, loop=loop)
for no in range(num_of_workers):
loop.create_task(worker(throttler, no))
while True:
now = loop.time()
# Same queueing logic with Throttler
while request_logs:
if now - request_logs[0] > 1.0:
request_logs.popleft()
else:
break
if len(request_logs) > rate_limit:
rate_limit_exceeded = True
message = 'Rate: %4d req/s (%s)' % (len(request_logs),
'PASSING' if not rate_limit_exceeded else 'EXCEEDED')
print('\r%-35s' % message, end='')
await asyncio.sleep(0.1)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# Add signal handler to handle Ctrl+C quit gracefully
def signal_handler():
for task in asyncio.Task.all_tasks():
task.cancel()
loop.add_signal_handler(signal.SIGINT, signal_handler)
loop.add_signal_handler(signal.SIGTERM, signal_handler)
try:
loop.run_until_complete(main(loop))
except asyncio.CancelledError:
print('\nCancelled')
finally:
loop.close()
import asyncio
from collections import deque
class Throttler:
"""Siple throttler for asyncio"""
def __init__(self, rate_limit, retry_interval=0.001, loop=None):
self.rate_limit = rate_limit
self.retry_interval = retry_interval
self.loop = loop or asyncio.get_event_loop()
self._task_logs = deque()
async def __aenter__(self):
while True:
now = self.loop.time()
# Pop items(which are start times) that are no longer in the
# time window
while self._task_logs:
if now - self._task_logs[0] > 1.0:
self._task_logs.popleft()
else:
break
# Exit the infinite loop when new task can be processed
if len(self._task_logs) < self.rate_limit:
break
await asyncio.sleep(self.retry_interval)
# Push new task's start time
self._task_logs.append(self.loop.time())
return self
async def __aexit__(self, exc_type, exc, tb):
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment