Created
December 2, 2022 13:47
-
-
Save zhum/35c7dd41f3b60860b990656818df05d2 to your computer and use it in GitHub Desktop.
Wrapper to execute one function with different arguments in parallel, using execution pools
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import concurrent.futures | |
import functools | |
import queue | |
import subprocess | |
import random | |
def pool_wrapper(q_in, q_out, size=5): | |
def pool_wrapper_inner(func): | |
@functools.wraps(func) | |
def my_wrapper(*args, **kwargs): | |
q = queue.Queue(q_in.qsize()) | |
with concurrent.futures.ThreadPoolExecutor(max_workers=size) as executor: | |
while not q_in.empty(): | |
data = q_in.get() | |
try: | |
q.put(executor.submit(func, data)) # , *args, **kwargs) | |
except Exception as e: | |
print(f"Exception {e}") | |
finally: | |
q_in.task_done() | |
while not q.empty(): | |
res = q.get().result() | |
if q_out is not None: | |
q_out.put(res) | |
return my_wrapper | |
return pool_wrapper_inner | |
# ========= example | |
q1 = queue.Queue(maxsize=20) | |
q2 = queue.Queue(maxsize=20) | |
# q2 is OPTIONAL, you can use None | |
@pool_wrapper(q1, q2, 2) # in, out, pool-size(5 by default) | |
def my_test(n): | |
print(f"sleep {n}") | |
subprocess.run(["sleep", str(n)]) | |
return n | |
# Create input data | |
for i in range(20): | |
q1.put(random.randrange(6)) | |
# Call! | |
my_test() | |
# Get results | |
while not q2.empty(): | |
print(f"{q2.get()}") | |
q2.task_done() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment