Skip to content

Instantly share code, notes, and snippets.

@TheLoneNut
Created April 29, 2021 15:14
Show Gist options
  • Select an option

  • Save TheLoneNut/153002aed1e38fba862bd24b6598f8cc to your computer and use it in GitHub Desktop.

Select an option

Save TheLoneNut/153002aed1e38fba862bd24b6598f8cc to your computer and use it in GitHub Desktop.
class simpleMultiprocessing:
'''
This class makes multiprocessing easy.
:param elements: A list of elements that can be split in smaller chunks and processed in parallel.
:param f_map: A function which takes a list of elements (normally a sublist of "elements") and process it.
:param f_reduce: [Optional] A callback function called each time f_map return from processing sublist of elements. The function takes the return value of f_map as input.
:param nProcesses: [Optional] Number of processes to spawn, default is twice the number of available processors.
:param verbose: [Optional] When set to True, displays the steps of multiprocessing.
'''
def __init__(self, elements, f_map, f_reduce=None, nProcesses=max(1, int(2.*float(os.getenv('CPU_LIMIT')))), verbose=True):
n_elements = len(elements)
pool = mp.Pool(processes=nProcesses)
elementsPerProcess = n_elements/nProcesses
boundaries = [int(i) for i in list(np.linspace(0, n_elements, nProcesses+1))]
for p in range(nProcesses):
start = boundaries[p]
stop = boundaries[p+1]
if verbose: print('simpleMultiprocessing::Creating a process for elements {}-{}'.format(start,stop-1))
if f_reduce:
pool.apply_async(f_map, args=[elements[start:stop]], callback=f_reduce)
else:
pool.apply_async(f_map, args=[elements[start:stop]])
pool.close()
if verbose: print('simpleMultiprocessing::All jobs submitted')
pool.join()
if verbose: print('simpleMultiprocessing::All jobs ended')
def _f_map_capsule(f, elements):
results = []
for item in elements:
try:
result = f([item])
except:
tmp = {
'return_status': 'exception',
'return_error': sys.exc_info()[1],
'processed_element': item
}
else:
tmp = {
'return_status': 'successful',
'return_result': result,
'processed_element': item
}
results.append(tmp)
return results
def _f_reduce_capsule(f, verbose, results):
if results:
for result in results:
if result and 'return_status' in result:
if result['return_status']=='exception':
print('An error occured...')
if 'processed_element' in result: print(' while processing item: {}'.format(result['processed_element']))
if 'return_error' in result: print(' exception: {}'.format(result['return_error']))
elif result['return_status']=='successful':
if verbose=='extra':
print('Received results...')
if 'processed_element' in result: print(' for item: {}'.format(result['processed_element']))
if 'return_result' in result: print(' result: {}'.format(result['return_result']))
if f != None: f(result['return_result'])
class protectedMultiprocessing:
'''
This class makes multiprocessing easy and add a protection layer which helps debugging which element failed.
:param elements: A list of elements that can be split in smaller chunks and processed in parallel.
:param f_map: A function which takes a list of elements (normally a sublist of "elements") and process it.
:param f_reduce: [Optional] A callback function called each time f_map return from processing sublist of elements. The function takes the return value of f_map as input.
:param nProcesses: [Optional] Number of processes to spawn, default is twice the number of available processors.
:param verbose: [Optional] When set to True, displays the steps of multiprocessing.
'''
def __init__(self, elements, f_map, f_reduce=None, nProcesses=max(1, int(2.*float(os.getenv('CPU_LIMIT')))), verbose=True):
n_elements = len(elements)
pool = mp.Pool(processes=nProcesses)
elementsPerProcess = n_elements/nProcesses
boundaries = [int(i) for i in list(np.linspace(0, n_elements, nProcesses+1))]
f_map_capsule = partial(_f_map_capsule, f_map)
f_reduce_capsule = partial(_f_reduce_capsule, f_reduce, verbose)
for p in range(nProcesses):
start = boundaries[p]
stop = boundaries[p+1]
if verbose: print('protectedMultiprocessing::Creating a process for elements {}-{}'.format(start,stop-1))
if f_reduce:
pool.apply_async(f_map_capsule, args=[elements[start:stop]], callback=f_reduce_capsule)
else:
pool.apply_async(f_map_capsule, args=[elements[start:stop]])
pool.close()
if verbose: print('protectedMultiprocessing::All jobs submitted')
pool.join()
if verbose: print('protectedMultiprocessing::All jobs ended')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment