Last active
February 22, 2019 04:00
-
-
Save kaniblu/7d8bf5e91b47c19140cf581ac17e7ad8 to your computer and use it in GitHub Desktop.
Using multi-processing with progress bar
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import itertools | |
import multiprocessing | |
import multiprocessing.pool as mp | |
import tqdm | |
# gensim.utils.chunkize_serial | |
def chunkize_serial(iterable, chunksize, as_numpy=False): | |
""" | |
Return elements from the iterable in `chunksize`-ed lists. The last returned | |
element may be smaller (if length of collection is not divisible by `chunksize`). | |
>>> print(list(grouper(range(10), 3))) | |
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]] | |
""" | |
import numpy | |
it = iter(iterable) | |
while True: | |
if as_numpy: | |
# convert each document to a 2d numpy array (~6x faster when transmitting | |
# chunk data over the wire, in Pyro) | |
wrapped_chunk = [[numpy.array(doc) for doc in itertools.islice(it, int(chunksize))]] | |
else: | |
wrapped_chunk = [list(itertools.islice(it, int(chunksize)))] | |
if not wrapped_chunk[0]: | |
break | |
# memory opt: wrap the chunk and then pop(), to avoid leaving behind a dangling reference | |
yield wrapped_chunk.pop() | |
# gensim.utils.InputQueue | |
class InputQueue(multiprocessing.Process): | |
def __init__(self, q, corpus, chunksize, maxsize, as_numpy): | |
super(InputQueue, self).__init__() | |
self.q = q | |
self.maxsize = maxsize | |
self.corpus = corpus | |
self.chunksize = chunksize | |
self.as_numpy = as_numpy | |
def run(self): | |
if self.as_numpy: | |
import numpy # don't clutter the global namespace with a dependency on numpy | |
it = iter(self.corpus) | |
while True: | |
chunk = itertools.islice(it, self.chunksize) | |
if self.as_numpy: | |
# HACK XXX convert documents to numpy arrays, to save memory. | |
# This also gives a scipy warning at runtime: | |
# "UserWarning: indices array has non-integer dtype (float64)" | |
wrapped_chunk = [[numpy.asarray(doc) for doc in chunk]] | |
else: | |
wrapped_chunk = [list(chunk)] | |
if not wrapped_chunk[0]: | |
self.q.put(None, block=True) | |
break | |
try: | |
qsize = self.q.qsize() | |
except NotImplementedError: | |
qsize = '?' | |
self.q.put(wrapped_chunk.pop(), block=True) | |
# gensim.utils.chunkize | |
def chunkize(corpus, chunksize, maxsize=0, as_numpy=False): | |
""" | |
Split a stream of values into smaller chunks. | |
Each chunk is of length `chunksize`, except the last one which may be smaller. | |
A once-only input stream (`corpus` from a generator) is ok, chunking is done | |
efficiently via itertools. | |
If `maxsize > 1`, don't wait idly in between successive chunk `yields`, but | |
rather keep filling a short queue (of size at most `maxsize`) with forthcoming | |
chunks in advance. This is realized by starting a separate process, and is | |
meant to reduce I/O delays, which can be significant when `corpus` comes | |
from a slow medium (like harddisk). | |
If `maxsize==0`, don't fool around with parallelism and simply yield the chunksize | |
via `chunkize_serial()` (no I/O optimizations). | |
>>> for chunk in chunkize(range(10), 4): print(chunk) | |
[0, 1, 2, 3] | |
[4, 5, 6, 7] | |
[8, 9] | |
""" | |
assert chunksize > 0 | |
if maxsize > 0: | |
q = multiprocessing.Queue(maxsize=maxsize) | |
worker = InputQueue(q, corpus, chunksize, maxsize=maxsize, as_numpy=as_numpy) | |
worker.daemon = True | |
worker.start() | |
while True: | |
chunk = [q.get(block=True)] | |
if chunk[0] is None: | |
break | |
yield chunk.pop() | |
else: | |
for chunk in chunkize_serial(corpus, chunksize, as_numpy=as_numpy): | |
yield chunk | |
def map_async(func, iterable, processes=4, scale=10, show_progress=False): | |
""" | |
Apply function `func` on all items in an iterable with multiple processes. | |
A pool of processes are dispatched initially, then they are destroyed after the | |
job is done. | |
Normally, tracking a multi-process progress can cause race conditions, and thus | |
it's tricky to implement. A work-around is to chunk the entire sequence into smaller | |
manageable pieces, and then apply multi-processing on each chunk piece, allowing | |
the progress to be monitored. The chunking algorithm is extracted from `gensim.utils`, | |
which was originally developed for various data processing pipelines, such as | |
wikipedia preprocessing etc. | |
Arguments: | |
func: (callable) a function that takes each item in the iterable and returns | |
a corresponding transformation | |
iterable: (iterator-like object) an object that supports `__iter__` | |
processes: (int) number of processes to be dispatched | |
scale: (int) size of each chunk with respect to the number of processes | |
(i.e. chunk_size = scale * processes) | |
show_progress: (bool) if enabled, a `tqdm` progress bar will be displayed | |
(run `pip install tqdm` if not installed) | |
Returns: | |
A list of transformed items (as long as the input iterable) | |
Example: | |
>>> from map_async import map_async | |
>>> square = lambda x: x ** 2 | |
>>> map_async(square, range(10)) | |
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] | |
>>> map_async(square, range(10), show_progress=True) | |
100%|█████████████████████████████████████████| 1/1 [00:00<00:00, 9.79it/s] | |
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] | |
""" | |
chunks = list(chunkize(iterable, processes * scale)) | |
pool = mp.Pool(processes) | |
ret = [] | |
progress = None | |
if show_progress: | |
import tqdm | |
progress = tqdm.tqdm(chunks) | |
for chunk in chunks: | |
ret.extend(pool.map(func, chunk)) | |
if progress is not None: | |
progress.update(1) | |
return ret | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment