Skip to content

Instantly share code, notes, and snippets.

@kaniblu
Last active February 22, 2019 04:00
Show Gist options
  • Save kaniblu/7d8bf5e91b47c19140cf581ac17e7ad8 to your computer and use it in GitHub Desktop.
Save kaniblu/7d8bf5e91b47c19140cf581ac17e7ad8 to your computer and use it in GitHub Desktop.
Using multi-processing with progress bar
import itertools
import multiprocessing
import multiprocessing.pool as mp
import tqdm
# gensim.utils.chunkize_serial
def chunkize_serial(iterable, chunksize, as_numpy=False):
"""
Return elements from the iterable in `chunksize`-ed lists. The last returned
element may be smaller (if length of collection is not divisible by `chunksize`).
>>> print(list(grouper(range(10), 3)))
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
"""
import numpy
it = iter(iterable)
while True:
if as_numpy:
# convert each document to a 2d numpy array (~6x faster when transmitting
# chunk data over the wire, in Pyro)
wrapped_chunk = [[numpy.array(doc) for doc in itertools.islice(it, int(chunksize))]]
else:
wrapped_chunk = [list(itertools.islice(it, int(chunksize)))]
if not wrapped_chunk[0]:
break
# memory opt: wrap the chunk and then pop(), to avoid leaving behind a dangling reference
yield wrapped_chunk.pop()
# gensim.utils.InputQueue
class InputQueue(multiprocessing.Process):
def __init__(self, q, corpus, chunksize, maxsize, as_numpy):
super(InputQueue, self).__init__()
self.q = q
self.maxsize = maxsize
self.corpus = corpus
self.chunksize = chunksize
self.as_numpy = as_numpy
def run(self):
if self.as_numpy:
import numpy # don't clutter the global namespace with a dependency on numpy
it = iter(self.corpus)
while True:
chunk = itertools.islice(it, self.chunksize)
if self.as_numpy:
# HACK XXX convert documents to numpy arrays, to save memory.
# This also gives a scipy warning at runtime:
# "UserWarning: indices array has non-integer dtype (float64)"
wrapped_chunk = [[numpy.asarray(doc) for doc in chunk]]
else:
wrapped_chunk = [list(chunk)]
if not wrapped_chunk[0]:
self.q.put(None, block=True)
break
try:
qsize = self.q.qsize()
except NotImplementedError:
qsize = '?'
self.q.put(wrapped_chunk.pop(), block=True)
# gensim.utils.chunkize
def chunkize(corpus, chunksize, maxsize=0, as_numpy=False):
"""
Split a stream of values into smaller chunks.
Each chunk is of length `chunksize`, except the last one which may be smaller.
A once-only input stream (`corpus` from a generator) is ok, chunking is done
efficiently via itertools.
If `maxsize > 1`, don't wait idly in between successive chunk `yields`, but
rather keep filling a short queue (of size at most `maxsize`) with forthcoming
chunks in advance. This is realized by starting a separate process, and is
meant to reduce I/O delays, which can be significant when `corpus` comes
from a slow medium (like harddisk).
If `maxsize==0`, don't fool around with parallelism and simply yield the chunksize
via `chunkize_serial()` (no I/O optimizations).
>>> for chunk in chunkize(range(10), 4): print(chunk)
[0, 1, 2, 3]
[4, 5, 6, 7]
[8, 9]
"""
assert chunksize > 0
if maxsize > 0:
q = multiprocessing.Queue(maxsize=maxsize)
worker = InputQueue(q, corpus, chunksize, maxsize=maxsize, as_numpy=as_numpy)
worker.daemon = True
worker.start()
while True:
chunk = [q.get(block=True)]
if chunk[0] is None:
break
yield chunk.pop()
else:
for chunk in chunkize_serial(corpus, chunksize, as_numpy=as_numpy):
yield chunk
def map_async(func, iterable, processes=4, scale=10, show_progress=False):
"""
Apply function `func` on all items in an iterable with multiple processes.
A pool of processes are dispatched initially, then they are destroyed after the
job is done.
Normally, tracking a multi-process progress can cause race conditions, and thus
it's tricky to implement. A work-around is to chunk the entire sequence into smaller
manageable pieces, and then apply multi-processing on each chunk piece, allowing
the progress to be monitored. The chunking algorithm is extracted from `gensim.utils`,
which was originally developed for various data processing pipelines, such as
wikipedia preprocessing etc.
Arguments:
func: (callable) a function that takes each item in the iterable and returns
a corresponding transformation
iterable: (iterator-like object) an object that supports `__iter__`
processes: (int) number of processes to be dispatched
scale: (int) size of each chunk with respect to the number of processes
(i.e. chunk_size = scale * processes)
show_progress: (bool) if enabled, a `tqdm` progress bar will be displayed
(run `pip install tqdm` if not installed)
Returns:
A list of transformed items (as long as the input iterable)
Example:
>>> from map_async import map_async
>>> square = lambda x: x ** 2
>>> map_async(square, range(10))
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> map_async(square, range(10), show_progress=True)
100%|█████████████████████████████████████████| 1/1 [00:00<00:00, 9.79it/s]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
"""
chunks = list(chunkize(iterable, processes * scale))
pool = mp.Pool(processes)
ret = []
progress = None
if show_progress:
import tqdm
progress = tqdm.tqdm(chunks)
for chunk in chunks:
ret.extend(pool.map(func, chunk))
if progress is not None:
progress.update(1)
return ret
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment