Skip to content

Instantly share code, notes, and snippets.

@sbealer
Last active December 12, 2022 22:48
Show Gist options
  • Save sbealer/31f47d20a75ddb79605c to your computer and use it in GitHub Desktop.
Save sbealer/31f47d20a75ddb79605c to your computer and use it in GitHub Desktop.
Python Multiprocessing/Parallel/Multi Example
import os
import multiprocessing
import random
import time
from services import logger
LOG = logger.get_logger(os.path.basename(__file__))
proc_name = 'Parallel Processing Example using multiprocessing module'
max_retries = 5 #Can't use the retry module here as it's mechanism doesn't jive with our parallel needs.
#######################################################################################################################
def pool_worker_process(in_data):
attempt = 1
task_stat = "Error: Unknown error"
while attempt <= max_retries:
try:
#worker_name = multiprocessing.current_process().name
worker_name = in_data["worker_name"]
#####################################
# Do something with the data passed in
sleep_time = random.randint(1, 15)
LOG.info(f"Worker: {worker_name}. Attempt: {attempt}. Simulating work by sleeping for {sleep_time} seconds.")
time.sleep(sleep_time)
#####################################
# Cause a fake error for one of the processes
if worker_name == 'Worker 3':
LOG.info(f"{worker_name}: Crap! I'm dead. Arrrrggh.")
raise Exception(f"{worker_name}: Something broke.")
LOG.info(f"{worker_name} process successful.")
task_stat = f"{worker_name}: Success"
break
except Exception as e:
try:
# To allow continued execution of all processes, all exceptions here must be handled.
task_stat = 'Error: ' + str(e)
except Exception as e:
task_stat = 'Error'
attempt += 1
return task_stat
#######################################################################################################################
def run():
LOG.info(f"Process beginning")
# Generate fake data list for example. Normally this would be a list of sites, file names, etc.
data_for_workers = [{"worker_name": f"Worker {x}"} for x in range(1, 15)]
# Here is where you can limit your pool size to either the list size (process all work simultaneously, or a smaller
# number to break the work up in chunks
pool_size = 5 # len(data_for_workers)
pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=1)
# Setup the pool. Map each process worker to a value in data_for_workers & start
pool_outputs = pool.map(pool_worker_process, data_for_workers)
# Prevent any more jobs from being added to the pool
pool.close()
pool.join()
# Once processes are done, this line, and everything underneath will be called.
errors_exist = False
# Here's where we check our workers for errors.
for rec in pool_outputs:
# If at least one error exists, we should probably do something about it.
if 'Error' in str(rec):
errors_exist = True
break
if errors_exist:
LOG.info(f"Errors detected in one or more processes. Handling error by sending email, etc.")
LOG.info(str(pool_outputs))
else:
LOG.info(f"All data elements processed successfully.")
LOG.info(f"Complete.")
if __name__ == '__main__':
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment