Skip to content

Instantly share code, notes, and snippets.

@leonrinkel
Created October 24, 2021 13:32
Show Gist options
  • Save leonrinkel/e3b51136f84a53de1e5b78e8373a683c to your computer and use it in GitHub Desktop.
Save leonrinkel/e3b51136f84a53de1e5b78e8373a683c to your computer and use it in GitHub Desktop.
Python Multiprocess Work Producer/Consumer Example
import time
import logging
import logging.handlers
import multiprocessing
number_of_work_consumers = 4
def log_handler_fn(log_queue: multiprocessing.Queue):
logging.basicConfig(level=logging.DEBUG)
while True:
log_record: logging.LogRecord = log_queue.get()
if log_record is None: break
logger = logging.getLogger(log_record.name)
logger.handle(log_record)
def work_producer_fn(
log_queue: multiprocessing.Queue,
work_queue:multiprocessing.Queue
):
logger = logging.getLogger("work_producer")
logger.addHandler(logging.handlers.QueueHandler(log_queue))
logger.setLevel(logging.DEBUG)
logger.info("started")
for i in range(10):
time.sleep(1)
work_queue.put(f"task{i}")
logger.info("finished")
def work_consumer_fn(
work_consumer_index: int,
log_queue: multiprocessing.Queue,
work_queue: multiprocessing.Queue
):
logger = logging.getLogger(f"work_consumer{work_consumer_index}")
logger.addHandler(logging.handlers.QueueHandler(log_queue))
logger.setLevel(logging.DEBUG)
logger.info("started")
while True:
work = work_queue.get()
if work is None: break
logger.info(f"processing {work}")
time.sleep(5)
logger.info("finished")
def main():
log_queue = multiprocessing.Queue()
work_queue = multiprocessing.Queue()
log_handler_process = multiprocessing.Process(
target=log_handler_fn, args=(log_queue,))
log_handler_process.start()
work_producer_process = multiprocessing.Process(
target=work_producer_fn, args=(log_queue, work_queue,))
work_producer_process.start()
work_consumer_processes = []
for work_consumer_index in range(number_of_work_consumers):
work_consumer_process = multiprocessing.Process(
target=work_consumer_fn, args=(work_consumer_index, log_queue, work_queue,))
work_consumer_process.start()
work_consumer_processes.append(work_consumer_process)
work_producer_process.join()
for _ in range(number_of_work_consumers):
work_queue.put(None)
for work_consumer_process in work_consumer_processes:
work_consumer_process.join()
log_queue.put(None)
log_handler_process.join()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment