Skip to content

Instantly share code, notes, and snippets.

@acarl005
Last active August 22, 2018 20:14
Show Gist options
  • Save acarl005/b10be4a51ad37754512cea305c817156 to your computer and use it in GitHub Desktop.
Save acarl005/b10be4a51ad37754512cea305c817156 to your computer and use it in GitHub Desktop.
How to use Python to perform an async task on a variable number of inputs, using a pool approach to limit concurrency. Kind of similar to Promise.map in Bluebird (JavaScript library).
import trio # async task synchronization library
import asks # async version of `requests`
from inspect import iscoroutinefunction
import requests
async def concurrent_map(inputs, task, conc_limit=1000):
"""
Like `map` except it is async. It runs an async function for each item in an iterable, and returns an array of the outputs in the correct order
args:
inputs - iterable of items which are passed to the async function
task - an async function
conc_limit - limit to the number of concurrent tasks running at any time
returns:
list of the returns values for eack task, preserving the order
"""
results = [None] * len(inputs)
async with trio.open_nursery() as nursery:
limit = trio.CapacityLimiter(conc_limit)
for i, item in enumerate(inputs):
nursery.start_soon(_work, limit, task, item, i, results)
return results
async def _work(limit, task, item, i, results):
"""helper for concurrent_map"""
async with limit:
print('getting', i)
if iscoroutinefunction(task):
output = await task(item, i)
else:
output = await trio.run_sync_in_worker_thread(task, item, i)
results[i] = output
print('finished', i)
if __name__ == "__main__":
# some URLs to fetch
URLs = [
"https://google.com/",
"https://en.wikipedia.org/",
"https://stackoverflow.com/",
"https://pypi.org/",
"https://cran.r-project.org/",
"https://www.datacamp.com/"
]
# regular sync functions will get run in a different thread
def get_page_sync(url, i):
"""get the length of characters in the HTML for each page (synchronously)"""
r = requests.get(url)
return len(r.text)
# or better yet, use async functions instead
async def get_page_async(url, i):
"""get the length of characters in the HTML for each page (asynchronously)"""
r = await asks.get(url)
return len(r.text)
asks.init("trio")
results = trio.run(concurrent_map, URLs, get_page_sync, 3)
print(results)
results = trio.run(concurrent_map, URLs, get_page_async, 3)
print(results)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment