Last active
August 22, 2018 20:14
-
-
Save acarl005/b10be4a51ad37754512cea305c817156 to your computer and use it in GitHub Desktop.
How to use Python to perform an async task on a variable number of inputs, using a pool approach to limit concurrency. Kind of similar to Promise.map in Bluebird (JavaScript library).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import trio # async task synchronization library | |
import asks # async version of `requests` | |
from inspect import iscoroutinefunction | |
import requests | |
async def concurrent_map(inputs, task, conc_limit=1000): | |
""" | |
Like `map` except it is async. It runs an async function for each item in an iterable, and returns an array of the outputs in the correct order | |
args: | |
inputs - iterable of items which are passed to the async function | |
task - an async function | |
conc_limit - limit to the number of concurrent tasks running at any time | |
returns: | |
list of the returns values for eack task, preserving the order | |
""" | |
results = [None] * len(inputs) | |
async with trio.open_nursery() as nursery: | |
limit = trio.CapacityLimiter(conc_limit) | |
for i, item in enumerate(inputs): | |
nursery.start_soon(_work, limit, task, item, i, results) | |
return results | |
async def _work(limit, task, item, i, results): | |
"""helper for concurrent_map""" | |
async with limit: | |
print('getting', i) | |
if iscoroutinefunction(task): | |
output = await task(item, i) | |
else: | |
output = await trio.run_sync_in_worker_thread(task, item, i) | |
results[i] = output | |
print('finished', i) | |
if __name__ == "__main__": | |
# some URLs to fetch | |
URLs = [ | |
"https://google.com/", | |
"https://en.wikipedia.org/", | |
"https://stackoverflow.com/", | |
"https://pypi.org/", | |
"https://cran.r-project.org/", | |
"https://www.datacamp.com/" | |
] | |
# regular sync functions will get run in a different thread | |
def get_page_sync(url, i): | |
"""get the length of characters in the HTML for each page (synchronously)""" | |
r = requests.get(url) | |
return len(r.text) | |
# or better yet, use async functions instead | |
async def get_page_async(url, i): | |
"""get the length of characters in the HTML for each page (asynchronously)""" | |
r = await asks.get(url) | |
return len(r.text) | |
asks.init("trio") | |
results = trio.run(concurrent_map, URLs, get_page_sync, 3) | |
print(results) | |
results = trio.run(concurrent_map, URLs, get_page_async, 3) | |
print(results) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment