Skip to content

Instantly share code, notes, and snippets.

@zhum
Created December 2, 2022 13:47
Show Gist options
  • Save zhum/35c7dd41f3b60860b990656818df05d2 to your computer and use it in GitHub Desktop.
Save zhum/35c7dd41f3b60860b990656818df05d2 to your computer and use it in GitHub Desktop.
Wrapper to execute one function with different arguments in parallel, using execution pools
import concurrent.futures
import functools
import queue
import subprocess
import random
def pool_wrapper(q_in, q_out, size=5):
def pool_wrapper_inner(func):
@functools.wraps(func)
def my_wrapper(*args, **kwargs):
q = queue.Queue(q_in.qsize())
with concurrent.futures.ThreadPoolExecutor(max_workers=size) as executor:
while not q_in.empty():
data = q_in.get()
try:
q.put(executor.submit(func, data)) # , *args, **kwargs)
except Exception as e:
print(f"Exception {e}")
finally:
q_in.task_done()
while not q.empty():
res = q.get().result()
if q_out is not None:
q_out.put(res)
return my_wrapper
return pool_wrapper_inner
# ========= example
q1 = queue.Queue(maxsize=20)
q2 = queue.Queue(maxsize=20)
# q2 is OPTIONAL, you can use None
@pool_wrapper(q1, q2, 2) # in, out, pool-size(5 by default)
def my_test(n):
print(f"sleep {n}")
subprocess.run(["sleep", str(n)])
return n
# Create input data
for i in range(20):
q1.put(random.randrange(6))
# Call!
my_test()
# Get results
while not q2.empty():
print(f"{q2.get()}")
q2.task_done()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment