Skip to content

Instantly share code, notes, and snippets.

@zhou13
Last active August 10, 2019 06:32
Show Gist options
  • Select an option

  • Save zhou13/9b5ad0bd7482fdb113bfce62fdd6aacd to your computer and use it in GitHub Desktop.

Select an option

Save zhou13/9b5ad0bd7482fdb113bfce62fdd6aacd to your computer and use it in GitHub Desktop.
import multiprocessing
def __parallel_handle(f, q_in, q_out):
while True:
i, x = q_in.get()
if i is None:
break
q_out.put((i, f(x)))
def parmap(f, X, max_workers=multiprocessing.cpu_count(), use_tqdm=True):
if use_tqdm:
from tqdm import tqdm
else:
def tqdm(x, **params):
return x
if max_workers == 0:
max_workers = multiprocessing.cpu_count()
q_in = multiprocessing.Queue(1)
q_out = multiprocessing.Queue()
proc = [
multiprocessing.Process(target=__parallel_handle, args=(f, q_in, q_out))
for _ in range(max_workers)
]
for p in proc:
p.daemon = True
p.start()
try:
sent = [
q_in.put((i, x))
for i, x in enumerate(tqdm(list(X), dynamic_ncols=True, smoothing=0))
]
[q_in.put((None, None)) for _ in range(max_workers)]
res = [q_out.get() for _ in range(len(sent))]
[p.join() for p in proc]
except KeyboardInterrupt:
q_in.close()
q_out.close()
raise
return [x for i, x in sorted(res)]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment