Created
November 1, 2019 11:25
-
-
Save eriseven/24bf1c61052a1b36ed68970251a60d20 to your computer and use it in GitHub Desktop.
Async and await with subprocesses
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Async and await example using subprocesses | |
Note: | |
Requires Python 3.6. | |
""" | |
import sys | |
import time | |
import platform | |
import asyncio | |
from pprint import pprint | |
async def run_command(*args): | |
"""Run command in subprocess. | |
Example from: | |
http://asyncio.readthedocs.io/en/latest/subprocess.html | |
""" | |
# Create subprocess | |
process = await asyncio.create_subprocess_exec( | |
*args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE | |
) | |
# Status | |
print("Started: %s, pid=%s" % (args, process.pid), flush=True) | |
# Wait for the subprocess to finish | |
stdout, stderr = await process.communicate() | |
# Progress | |
if process.returncode == 0: | |
print( | |
"Done: %s, pid=%s, result: %s" | |
% (args, process.pid, stdout.decode().strip()), | |
flush=True, | |
) | |
else: | |
print( | |
"Failed: %s, pid=%s, result: %s" | |
% (args, process.pid, stderr.decode().strip()), | |
flush=True, | |
) | |
# Result | |
result = stdout.decode().strip() | |
# Return stdout | |
return result | |
async def run_command_shell(command): | |
"""Run command in subprocess (shell). | |
Note: | |
This can be used if you wish to execute e.g. "copy" | |
on Windows, which can only be executed in the shell. | |
""" | |
# Create subprocess | |
process = await asyncio.create_subprocess_shell( | |
command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE | |
) | |
# Status | |
print("Started:", command, "(pid = " + str(process.pid) + ")", flush=True) | |
# Wait for the subprocess to finish | |
stdout, stderr = await process.communicate() | |
# Progress | |
if process.returncode == 0: | |
print("Done:", command, "(pid = " + str(process.pid) + ")", flush=True) | |
else: | |
print( | |
"Failed:", command, "(pid = " + str(process.pid) + ")", flush=True | |
) | |
# Result | |
result = stdout.decode().strip() | |
# Return stdout | |
return result | |
def make_chunks(l, n): | |
"""Yield successive n-sized chunks from l. | |
Note: | |
Taken from https://stackoverflow.com/a/312464 | |
""" | |
if sys.version_info.major == 2: | |
for i in xrange(0, len(l), n): | |
yield l[i : i + n] | |
else: | |
# Assume Python 3 | |
for i in range(0, len(l), n): | |
yield l[i : i + n] | |
def run_asyncio_commands(tasks, max_concurrent_tasks=0): | |
"""Run tasks asynchronously using asyncio and return results. | |
If max_concurrent_tasks are set to 0, no limit is applied. | |
Note: | |
By default, Windows uses SelectorEventLoop, which does not support | |
subprocesses. Therefore ProactorEventLoop is used on Windows. | |
https://docs.python.org/3/library/asyncio-eventloops.html#windows | |
""" | |
all_results = [] | |
if max_concurrent_tasks == 0: | |
chunks = [tasks] | |
num_chunks = len(chunks) | |
else: | |
chunks = make_chunks(l=tasks, n=max_concurrent_tasks) | |
num_chunks = len(list(make_chunks(l=tasks, n=max_concurrent_tasks))) | |
if asyncio.get_event_loop().is_closed(): | |
asyncio.set_event_loop(asyncio.new_event_loop()) | |
if platform.system() == "Windows": | |
asyncio.set_event_loop(asyncio.ProactorEventLoop()) | |
loop = asyncio.get_event_loop() | |
chunk = 1 | |
for tasks_in_chunk in chunks: | |
print( | |
"Beginning work on chunk %s/%s" % (chunk, num_chunks), flush=True | |
) | |
commands = asyncio.gather(*tasks_in_chunk) # Unpack list using * | |
results = loop.run_until_complete(commands) | |
all_results += results | |
print( | |
"Completed work on chunk %s/%s" % (chunk, num_chunks), flush=True | |
) | |
chunk += 1 | |
loop.close() | |
return all_results | |
def main(): | |
"""Main program.""" | |
start = time.time() | |
if platform.system() == "Windows": | |
# Commands to be executed on Windows | |
commands = [["hostname"]] | |
else: | |
# Commands to be executed on Unix | |
commands = [["du", "-sh", "/var/tmp"], ["hostname"]] | |
tasks = [] | |
for command in commands: | |
tasks.append(run_command(*command)) | |
# # Shell execution example | |
# tasks = [run_command_shell('copy c:/somefile d:/new_file')] | |
# # List comprehension example | |
# tasks = [ | |
# run_command(*command, get_project_path(project)) | |
# for project in accessible_projects(all_projects) | |
# ] | |
results = run_asyncio_commands( | |
tasks, max_concurrent_tasks=20 | |
) # At most 20 parallel tasks | |
print("Results:") | |
pprint(results) | |
end = time.time() | |
rounded_end = "{0:.4f}".format(round(end - start, 4)) | |
print("Script ran in about %s seconds" % (rounded_end), flush=True) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment