Created
May 8, 2019 21:33
-
-
Save austospumanto/694723b9a4d5c1ca843b88e3460765ec to your computer and use it in GitHub Desktop.
Executing CPU-intensive workloads via multiprocessing.Process sub-processes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import multiprocessing | |
import pickle | |
import struct | |
from typing import Optional, Callable, List, Any | |
from tqdm import tqdm | |
import pickle | |
import struct | |
import warnings | |
from multiprocessing.connection import Connection | |
from typing import Optional | |
def exec_proc(fn, child_conn: Optional[Connection] = None): | |
warnings.catch_warnings(record=False) | |
try: | |
res = fn() | |
if child_conn is not None: | |
pickled_bytes: bytes = pickle.dumps(res, protocol=pickle.HIGHEST_PROTOCOL) | |
length_as_bytes: bytes = struct.pack("<Q", len(pickled_bytes)) | |
# noinspection PyUnresolvedReferences | |
child_conn._send(length_as_bytes + pickled_bytes) | |
else: | |
return res | |
except Exception as e: | |
print(f"Exception when executing function in subprocess. Error: {repr(e)}") | |
raise e | |
def execute_concurrently_without_pool( | |
*fns: Callable, desc: Optional[str] = None | |
) -> List[Any]: | |
results = [None] * len(fns) | |
tq = tqdm(total=len(fns), desc=desc) | |
procs = [] # : List[multiprocessing.Process] | |
parent_conns = [] # s: List[Connection] | |
child_conns = [] # s: List[Connection] | |
for fn in fns: | |
parent_conn, child_conn = multiprocessing.Pipe(duplex=False) | |
proc = multiprocessing.Process(target=exec_proc, args=(fn, child_conn)) | |
proc.start() | |
procs.append(proc) | |
parent_conns.append(parent_conn) | |
child_conns.append(child_conn) | |
unfinished_proc_idxs = set(range(len(procs))) # : Set[int] | |
while unfinished_proc_idxs: | |
for proc_idx in list(unfinished_proc_idxs): | |
parent_conn = parent_conns[proc_idx] | |
if parent_conn.poll(0.01): | |
# noinspection PyUnresolvedReferences | |
binary_length = parent_conn._recv(8).getvalue() # : bytes | |
length = struct.unpack("<Q", binary_length)[0] # : int | |
# noinspection PyUnresolvedReferences | |
pickled_bytes = parent_conn._recv(length).getvalue() | |
results[proc_idx] = pickle.loads(pickled_bytes) | |
procs[proc_idx].join(timeout=None) | |
unfinished_proc_idxs.discard(proc_idx) | |
parent_conn.close() | |
child_conns[proc_idx].close() | |
tq.update(1) | |
tq.close() | |
return results |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment