Last active
December 12, 2022 22:48
-
-
Save sbealer/31f47d20a75ddb79605c to your computer and use it in GitHub Desktop.
Python Multiprocessing/Parallel/Multi Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import multiprocessing | |
import random | |
import time | |
from services import logger | |
LOG = logger.get_logger(os.path.basename(__file__)) | |
proc_name = 'Parallel Processing Example using multiprocessing module' | |
max_retries = 5 #Can't use the retry module here as it's mechanism doesn't jive with our parallel needs. | |
####################################################################################################################### | |
def pool_worker_process(in_data): | |
attempt = 1 | |
task_stat = "Error: Unknown error" | |
while attempt <= max_retries: | |
try: | |
#worker_name = multiprocessing.current_process().name | |
worker_name = in_data["worker_name"] | |
##################################### | |
# Do something with the data passed in | |
sleep_time = random.randint(1, 15) | |
LOG.info(f"Worker: {worker_name}. Attempt: {attempt}. Simulating work by sleeping for {sleep_time} seconds.") | |
time.sleep(sleep_time) | |
##################################### | |
# Cause a fake error for one of the processes | |
if worker_name == 'Worker 3': | |
LOG.info(f"{worker_name}: Crap! I'm dead. Arrrrggh.") | |
raise Exception(f"{worker_name}: Something broke.") | |
LOG.info(f"{worker_name} process successful.") | |
task_stat = f"{worker_name}: Success" | |
break | |
except Exception as e: | |
try: | |
# To allow continued execution of all processes, all exceptions here must be handled. | |
task_stat = 'Error: ' + str(e) | |
except Exception as e: | |
task_stat = 'Error' | |
attempt += 1 | |
return task_stat | |
####################################################################################################################### | |
def run(): | |
LOG.info(f"Process beginning") | |
# Generate fake data list for example. Normally this would be a list of sites, file names, etc. | |
data_for_workers = [{"worker_name": f"Worker {x}"} for x in range(1, 15)] | |
# Here is where you can limit your pool size to either the list size (process all work simultaneously, or a smaller | |
# number to break the work up in chunks | |
pool_size = 5 # len(data_for_workers) | |
pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=1) | |
# Setup the pool. Map each process worker to a value in data_for_workers & start | |
pool_outputs = pool.map(pool_worker_process, data_for_workers) | |
# Prevent any more jobs from being added to the pool | |
pool.close() | |
pool.join() | |
# Once processes are done, this line, and everything underneath will be called. | |
errors_exist = False | |
# Here's where we check our workers for errors. | |
for rec in pool_outputs: | |
# If at least one error exists, we should probably do something about it. | |
if 'Error' in str(rec): | |
errors_exist = True | |
break | |
if errors_exist: | |
LOG.info(f"Errors detected in one or more processes. Handling error by sending email, etc.") | |
LOG.info(str(pool_outputs)) | |
else: | |
LOG.info(f"All data elements processed successfully.") | |
LOG.info(f"Complete.") | |
if __name__ == '__main__': | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment