Created
October 24, 2021 13:32
-
-
Save leonrinkel/e3b51136f84a53de1e5b78e8373a683c to your computer and use it in GitHub Desktop.
Python Multiprocess Work Producer/Consumer Example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import logging | |
import logging.handlers | |
import multiprocessing | |
number_of_work_consumers = 4 | |
def log_handler_fn(log_queue: multiprocessing.Queue): | |
logging.basicConfig(level=logging.DEBUG) | |
while True: | |
log_record: logging.LogRecord = log_queue.get() | |
if log_record is None: break | |
logger = logging.getLogger(log_record.name) | |
logger.handle(log_record) | |
def work_producer_fn( | |
log_queue: multiprocessing.Queue, | |
work_queue:multiprocessing.Queue | |
): | |
logger = logging.getLogger("work_producer") | |
logger.addHandler(logging.handlers.QueueHandler(log_queue)) | |
logger.setLevel(logging.DEBUG) | |
logger.info("started") | |
for i in range(10): | |
time.sleep(1) | |
work_queue.put(f"task{i}") | |
logger.info("finished") | |
def work_consumer_fn( | |
work_consumer_index: int, | |
log_queue: multiprocessing.Queue, | |
work_queue: multiprocessing.Queue | |
): | |
logger = logging.getLogger(f"work_consumer{work_consumer_index}") | |
logger.addHandler(logging.handlers.QueueHandler(log_queue)) | |
logger.setLevel(logging.DEBUG) | |
logger.info("started") | |
while True: | |
work = work_queue.get() | |
if work is None: break | |
logger.info(f"processing {work}") | |
time.sleep(5) | |
logger.info("finished") | |
def main(): | |
log_queue = multiprocessing.Queue() | |
work_queue = multiprocessing.Queue() | |
log_handler_process = multiprocessing.Process( | |
target=log_handler_fn, args=(log_queue,)) | |
log_handler_process.start() | |
work_producer_process = multiprocessing.Process( | |
target=work_producer_fn, args=(log_queue, work_queue,)) | |
work_producer_process.start() | |
work_consumer_processes = [] | |
for work_consumer_index in range(number_of_work_consumers): | |
work_consumer_process = multiprocessing.Process( | |
target=work_consumer_fn, args=(work_consumer_index, log_queue, work_queue,)) | |
work_consumer_process.start() | |
work_consumer_processes.append(work_consumer_process) | |
work_producer_process.join() | |
for _ in range(number_of_work_consumers): | |
work_queue.put(None) | |
for work_consumer_process in work_consumer_processes: | |
work_consumer_process.join() | |
log_queue.put(None) | |
log_handler_process.join() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment