Skip to content

Instantly share code, notes, and snippets.

@andyfaff
Created September 7, 2018 00:34
Show Gist options
  • Select an option

  • Save andyfaff/fcd91e928e5689c554bcfe2e974c0c58 to your computer and use it in GitHub Desktop.

Select an option

Save andyfaff/fcd91e928e5689c554bcfe2e974c0c58 to your computer and use it in GitHub Desktop.
Parallel processing pool co-routine
from joblib import Parallel, delayed
class PoolWrapper(object):
def __init__(self, pool=-1):
self.pool = None
self._mapfunc = map
self._own_pool = False
if hasattr(pool, 'map'):
self.pool = pool
self._mapfunc = self.pool.map
else:
# user supplies a number
if int(pool) == -1:
# use as many processors as possible
self.pool = coroutine(int(pool))
next(self.pool)
self._own_pool = True
elif int(pool) == 1:
pass
elif int(pool) > 1:
# use the number of processors requested
self.pool = coroutine(int(pool))
next(self.pool)
self._own_pool = True
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def close(self):
if self._own_pool:
self.pool.close()
def map(self, func, iterable):
# only accept one iterable because that's all Pool.map accepts
if self._own_pool:
return self.pool.send((func, iterable))
else:
return self._mapfunc(func, iterable)
def coroutine(n_jobs):
with Parallel(n_jobs=n_jobs) as p:
while True:
func, iterable = (yield)
yield p(delayed(func)(x) for x in iterable)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment