Skip to content

Instantly share code, notes, and snippets.

@the-moog
Created April 22, 2022 12:04
Show Gist options
  • Save the-moog/415965a83074366d386db2baf5b6a371 to your computer and use it in GitHub Desktop.
Save the-moog/415965a83074366d386db2baf5b6a371 to your computer and use it in GitHub Desktop.
Using concurrent.futures to call secondary Python scripts with parameters and collect the results
# inner.py
# Note we did not load sys module, it gets passed to us
script_name = sys.argv[0]
import time
import random
state = "!! RESTARTED !!" if restarted else "loaded"
runtime = 1 if not restarted else 15
print(f"Script {script_name} {state}")
if __name__ == "__main__":
argv = sys.argv[1:]
print(f"..... Run with argv: {argv} and n={n}")
time.sleep(runtime + random.randint(0, 5))
return_value = "Done"
import runpy
import sys
import concurrent.futures
import random
def run_another(n, restarted=False):
"""
Utility function to set parameters and run the script
"""
# Prepare some parameters to pass
globs = {}
if len(sys.argv) == 1:
sys.argv += [n]
else:
sys.argv[-1] = n
globs['restarted'] = restarted
globs['sys'] = sys
globs.update({'n': n})
ret = runpy.run_path("inner.py", init_globals = globs, run_name="__main__")
# return the result (a dictionary of globals)
return ret
def create(n, restarted=False):
"""
Utility function to help create futures
"""
future = executor.submit(run_another, n, restarted)
futures[future] = n
return future
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {}
# Create 100 futures
for n in range(100):
create(n)
# Randomly kill off 20 of those futures
future_list = list(futures.keys())
for n in range(20):
index = random.randint(0,len(futures)-1)
future_list[index].cancel()
# Randomly restart one cancelled future
cancelled_list = [future for future in futures.keys() if future.cancelled()]
restart = cancelled_list[random.randint(0, len(cancelled_list)-1)]
index = futures[restart]
print(f"Restarting {index}")
create(index, True)
# For the all the completed futures and get their return data as they finish
for future in concurrent.futures.as_completed(futures):
n = futures[future]
try:
data = future.result()
except concurrent.futures.CancelledError:
print(f"{n} was CANCELLED!!")
except Exception as exc:
print(f"{n} generated an exception: {exc}")
else:
print(f"{n} Returned {data['return_value']} {data['n']}")
# Prove this future is the one we expected
assert n == data['n']
@the-moog
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment