Skip to content

Instantly share code, notes, and snippets.

@rendarz
Created May 26, 2020 13:58
Show Gist options
  • Save rendarz/96653f9e162fe547860128ccf85bb716 to your computer and use it in GitHub Desktop.
Save rendarz/96653f9e162fe547860128ccf85bb716 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import asyncio
class HTTPManager():
def __init__(self):
self.cache = {}
async def get_throttled_url(self, url, throttle=2):
if url not in self.cache:
self.cache[url] = asyncio.Semaphore(throttle)
async with self.cache[url]:
await self.get_url(url)
return 3
async def get_url(self, url):
print('GET {url} ...'.format(url=url))
await asyncio.sleep(2)
print('GOT IT: {url}.'.format(url=url))
return 2
# Python 3.7+
loop = asyncio.get_event_loop()
http = HTTPManager()
urls = ("abc", "xyz", "abc", "eee", "abc", "eee", "xyz", "xyz", "abc", "abc", "xyz", "eee")
todo = [http.get_throttled_url(url) for url in urls]
w = asyncio.wait(todo)
res, _ = loop.run_until_complete(w)
for r in res:
print(r)
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment