Created
April 22, 2022 12:04
-
-
Save the-moog/415965a83074366d386db2baf5b6a371 to your computer and use it in GitHub Desktop.
Using concurrent.futures to call secondary Python scripts with parameters and collect the results
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# inner.py | |
# Note we did not load sys module, it gets passed to us | |
script_name = sys.argv[0] | |
import time | |
import random | |
state = "!! RESTARTED !!" if restarted else "loaded" | |
runtime = 1 if not restarted else 15 | |
print(f"Script {script_name} {state}") | |
if __name__ == "__main__": | |
argv = sys.argv[1:] | |
print(f"..... Run with argv: {argv} and n={n}") | |
time.sleep(runtime + random.randint(0, 5)) | |
return_value = "Done" | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import runpy | |
import sys | |
import concurrent.futures | |
import random | |
def run_another(n, restarted=False): | |
""" | |
Utility function to set parameters and run the script | |
""" | |
# Prepare some parameters to pass | |
globs = {} | |
if len(sys.argv) == 1: | |
sys.argv += [n] | |
else: | |
sys.argv[-1] = n | |
globs['restarted'] = restarted | |
globs['sys'] = sys | |
globs.update({'n': n}) | |
ret = runpy.run_path("inner.py", init_globals = globs, run_name="__main__") | |
# return the result (a dictionary of globals) | |
return ret | |
def create(n, restarted=False): | |
""" | |
Utility function to help create futures | |
""" | |
future = executor.submit(run_another, n, restarted) | |
futures[future] = n | |
return future | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
futures = {} | |
# Create 100 futures | |
for n in range(100): | |
create(n) | |
# Randomly kill off 20 of those futures | |
future_list = list(futures.keys()) | |
for n in range(20): | |
index = random.randint(0,len(futures)-1) | |
future_list[index].cancel() | |
# Randomly restart one cancelled future | |
cancelled_list = [future for future in futures.keys() if future.cancelled()] | |
restart = cancelled_list[random.randint(0, len(cancelled_list)-1)] | |
index = futures[restart] | |
print(f"Restarting {index}") | |
create(index, True) | |
# For the all the completed futures and get their return data as they finish | |
for future in concurrent.futures.as_completed(futures): | |
n = futures[future] | |
try: | |
data = future.result() | |
except concurrent.futures.CancelledError: | |
print(f"{n} was CANCELLED!!") | |
except Exception as exc: | |
print(f"{n} generated an exception: {exc}") | |
else: | |
print(f"{n} Returned {data['return_value']} {data['n']}") | |
# Prove this future is the one we expected | |
assert n == data['n'] | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
As an answer to https://stackoverflow.com/q/71728548/1364242