Created
April 29, 2021 15:14
-
-
Save TheLoneNut/153002aed1e38fba862bd24b6598f8cc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class simpleMultiprocessing: | |
| ''' | |
| This class makes multiprocessing easy. | |
| :param elements: A list of elements that can be split in smaller chunks and processed in parallel. | |
| :param f_map: A function which takes a list of elements (normally a sublist of "elements") and process it. | |
| :param f_reduce: [Optional] A callback function called each time f_map return from processing sublist of elements. The function takes the return value of f_map as input. | |
| :param nProcesses: [Optional] Number of processes to spawn, default is twice the number of available processors. | |
| :param verbose: [Optional] When set to True, displays the steps of multiprocessing. | |
| ''' | |
| def __init__(self, elements, f_map, f_reduce=None, nProcesses=max(1, int(2.*float(os.getenv('CPU_LIMIT')))), verbose=True): | |
| n_elements = len(elements) | |
| pool = mp.Pool(processes=nProcesses) | |
| elementsPerProcess = n_elements/nProcesses | |
| boundaries = [int(i) for i in list(np.linspace(0, n_elements, nProcesses+1))] | |
| for p in range(nProcesses): | |
| start = boundaries[p] | |
| stop = boundaries[p+1] | |
| if verbose: print('simpleMultiprocessing::Creating a process for elements {}-{}'.format(start,stop-1)) | |
| if f_reduce: | |
| pool.apply_async(f_map, args=[elements[start:stop]], callback=f_reduce) | |
| else: | |
| pool.apply_async(f_map, args=[elements[start:stop]]) | |
| pool.close() | |
| if verbose: print('simpleMultiprocessing::All jobs submitted') | |
| pool.join() | |
| if verbose: print('simpleMultiprocessing::All jobs ended') | |
| def _f_map_capsule(f, elements): | |
| results = [] | |
| for item in elements: | |
| try: | |
| result = f([item]) | |
| except: | |
| tmp = { | |
| 'return_status': 'exception', | |
| 'return_error': sys.exc_info()[1], | |
| 'processed_element': item | |
| } | |
| else: | |
| tmp = { | |
| 'return_status': 'successful', | |
| 'return_result': result, | |
| 'processed_element': item | |
| } | |
| results.append(tmp) | |
| return results | |
| def _f_reduce_capsule(f, verbose, results): | |
| if results: | |
| for result in results: | |
| if result and 'return_status' in result: | |
| if result['return_status']=='exception': | |
| print('An error occured...') | |
| if 'processed_element' in result: print(' while processing item: {}'.format(result['processed_element'])) | |
| if 'return_error' in result: print(' exception: {}'.format(result['return_error'])) | |
| elif result['return_status']=='successful': | |
| if verbose=='extra': | |
| print('Received results...') | |
| if 'processed_element' in result: print(' for item: {}'.format(result['processed_element'])) | |
| if 'return_result' in result: print(' result: {}'.format(result['return_result'])) | |
| if f != None: f(result['return_result']) | |
| class protectedMultiprocessing: | |
| ''' | |
| This class makes multiprocessing easy and add a protection layer which helps debugging which element failed. | |
| :param elements: A list of elements that can be split in smaller chunks and processed in parallel. | |
| :param f_map: A function which takes a list of elements (normally a sublist of "elements") and process it. | |
| :param f_reduce: [Optional] A callback function called each time f_map return from processing sublist of elements. The function takes the return value of f_map as input. | |
| :param nProcesses: [Optional] Number of processes to spawn, default is twice the number of available processors. | |
| :param verbose: [Optional] When set to True, displays the steps of multiprocessing. | |
| ''' | |
| def __init__(self, elements, f_map, f_reduce=None, nProcesses=max(1, int(2.*float(os.getenv('CPU_LIMIT')))), verbose=True): | |
| n_elements = len(elements) | |
| pool = mp.Pool(processes=nProcesses) | |
| elementsPerProcess = n_elements/nProcesses | |
| boundaries = [int(i) for i in list(np.linspace(0, n_elements, nProcesses+1))] | |
| f_map_capsule = partial(_f_map_capsule, f_map) | |
| f_reduce_capsule = partial(_f_reduce_capsule, f_reduce, verbose) | |
| for p in range(nProcesses): | |
| start = boundaries[p] | |
| stop = boundaries[p+1] | |
| if verbose: print('protectedMultiprocessing::Creating a process for elements {}-{}'.format(start,stop-1)) | |
| if f_reduce: | |
| pool.apply_async(f_map_capsule, args=[elements[start:stop]], callback=f_reduce_capsule) | |
| else: | |
| pool.apply_async(f_map_capsule, args=[elements[start:stop]]) | |
| pool.close() | |
| if verbose: print('protectedMultiprocessing::All jobs submitted') | |
| pool.join() | |
| if verbose: print('protectedMultiprocessing::All jobs ended') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment