Last active
October 19, 2017 23:41
-
-
Save cdunklau/58b444dcde0b8e1899b0475a49e5a538 to your computer and use it in GitHub Desktop.
Constrain number of simultanous HTTP requests with asyncio
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import itertools | |
import aiohttp | |
import async_timeout | |
async def fetch_with_response_delay(session, delay): | |
if not 0 <= delay <= 10: | |
raise ValueError('Delay must be between 0 and 10 inclusive') | |
url = 'http://httpbin.org/delay/{0}'.format(delay) | |
with async_timeout.timeout(15): | |
async with session.get(url) as response: | |
return await response.json() | |
class CoroutineLimiter: | |
""" | |
Inspired by twisted.internet.defer.DeferredSemaphore | |
If `invoke_as_tasks` is true, wrap the invoked coroutines in Task | |
objects. This will ensure ensure that the coroutines happen in the | |
same order `.invoke()` was called, if the tasks are given | |
to `asyncio.gather`. | |
""" | |
def __init__(self, limit, *, loop=None, invoke_as_tasks=False): | |
if limit <= 0: | |
raise ValueError('Limit must be nonzero and positive') | |
if loop is None: | |
loop = asyncio.get_event_loop() | |
self._loop = loop | |
self._sem = asyncio.Semaphore(limit, loop=loop) | |
self._count = itertools.count(1) | |
self._invoke_as_tasks = invoke_as_tasks | |
def invoke(self, coro_callable, *args): | |
coro = self._invoke(coro_callable, *args) | |
if self._invoke_as_tasks: | |
return self._loop.create_task(coro) | |
else: | |
return coro | |
async def _invoke(self, coro_callable, *args): | |
n = next(self._count) | |
fmt = 'Acquiring semaphore for coroutine {count} with args {args}' | |
print(fmt.format(count=n, args=args)) | |
await self._sem.acquire() | |
fmt = 'Semaphore acquired. Invoking coroutine {count} with args {args}' | |
print(fmt.format(count=n, args=args)) | |
try: | |
return await coro_callable(*args) | |
finally: | |
print('Coroutine {count} finished, releasing semaphore'.format( | |
count=n, | |
)) | |
self._sem.release() | |
async def run(loop): | |
delays = [1, 2, 3, 4, 5, 6, 7, 8, 9] | |
limiter = CoroutineLimiter(4, loop=loop, invoke_as_tasks=True) | |
async with aiohttp.ClientSession(loop=loop) as session: | |
requests = [ | |
limiter.invoke(fetch_with_response_delay, session, n) | |
for n in delays | |
] | |
results = await asyncio.gather(*requests, loop=loop) | |
print('finished! results: {0}'.format(results)) | |
def main(): | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(run(loop)) | |
if __name__ == '__main__': | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import collections | |
import aiohttp | |
import async_timeout | |
async def fetch_with_response_delay(session, delay): | |
if not 0 <= delay <= 10: | |
raise ValueError('Delay must be between 0 and 10 inclusive') | |
url = 'http://httpbin.org/delay/{0}'.format(delay) | |
with async_timeout.timeout(15): | |
async with session.get(url) as response: | |
return await response.json() | |
async def simple_limiter(futures, *, limit): | |
remaining = collections.deque(futures) | |
pending = {remaining.popleft() for _ in range(limit)} | |
while True: | |
print('Will wait on {n} futures from total of {tot}'.format( | |
n=len(pending), tot=len(pending)+len(remaining))) | |
done, pending = await asyncio.wait( | |
pending, return_when=asyncio.FIRST_COMPLETED) | |
print('Got {nd} done and {np} pending, total left {tot}'.format( | |
nd=len(done), np=len(pending), tot=len(pending)+len(remaining))) | |
if not pending and not remaining: | |
break | |
tograb = min(len(remaining), limit - len(pending)) | |
for _ in range(tograb): | |
pending.add(remaining.popleft()) | |
async def run(loop): | |
delays = [1, 2, 3, 4, 5, 6, 7, 8, 9] | |
async with aiohttp.ClientSession(loop=loop) as session: | |
requests = [ | |
fetch_with_response_delay(session, n) | |
for n in delays | |
] | |
print('starting...') | |
results = await simple_limiter(requests, limit=4) | |
print('finished! results: {0}'.format(results)) | |
def main(): | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(run(loop)) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment