Skip to content

Instantly share code, notes, and snippets.

@rendarz
Created May 29, 2020 15:54
Show Gist options
  • Save rendarz/2dbc5441167260931b04a1c1a2d4b0fd to your computer and use it in GitHub Desktop.
Save rendarz/2dbc5441167260931b04a1c1a2d4b0fd to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import asyncio
class AsyncThrottle():
DEFAULT_THROTTLE = 3
def __init__(self):
self.throttle = {}
def set(self, key, throttle=None):
if throttle is None: throttle = self.DEFAULT_THROTTLE
if key not in self.throttle:
self.throttle[key] = asyncio.Semaphore(throttle)
return True
return False
async def get(self, key, coro, throttle=None):
if throttle is None: throttle = self.DEFAULT_THROTTLE
if key not in self.throttle:
self.throttle[key] = asyncio.Semaphore(throttle)
async with self.throttle[key]:
result = await coro
return result
async def get_url(url):
print(f"GET URL: {url} ...")
await asyncio.sleep(3)
print(f"GOT IT! {url}.")
return 1
async def get_throttled_urls(urls, throttle):
requests = []
for url in urls:
requests.append(throttle.get(key=url, coro=get_url(url)))
results = await asyncio.gather(*requests, return_exceptions=True)
return results
async def my_data_grabber1(throttle):
results = []
result = await get_throttled_urls(("abc",), throttle)
results.append(result)
result = await get_throttled_urls(("xxx",), throttle)
results.append(result)
result = await get_throttled_urls(("abc", "ooo", "ooo","ooo","ooo","ooo","ooo","ooo","ooo","ooo","ooo","ooo","ooo","ooo",), throttle)
results.append(result)
result = await get_throttled_urls(("abc",), throttle)
results.append(result)
result = await get_throttled_urls(("abc",), throttle)
results.append(result)
return results
async def my_data_grabber2(throttle):
results = []
result = await get_throttled_urls(("xxx",), throttle)
results.append(result)
result = await get_throttled_urls(("xxx",), throttle)
results.append(result)
result = await get_throttled_urls(("xxx",), throttle)
results.append(result)
result = await get_throttled_urls(("xxx",), throttle)
results.append(result)
result = await get_throttled_urls(("xxx",), throttle)
results.append(result)
return results
async def main():
throttle = AsyncThrottle()
requests = (get_throttled_urls(("abc", "eee", "abc", "abc", "eee", "abc"), throttle),
my_data_grabber1(throttle),
my_data_grabber2(throttle),my_data_grabber2(throttle),my_data_grabber2(throttle),my_data_grabber2(throttle),my_data_grabber2(throttle))
results = await asyncio.gather(*requests, return_exceptions=True)
print(results)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment