Created
October 1, 2024 20:00
-
-
Save rudolfbyker/9f3745ed86e8284b5e986523dd7c4a70 to your computer and use it in GitHub Desktop.
Combining logs from multiple processes in Python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import multiprocessing | |
from logging import LogRecord, getLogger, Logger, DEBUG | |
from logging.handlers import QueueHandler | |
from queue import Queue | |
from typing import Optional, TypedDict, List | |
from .log_combiner import log_combiner_thread | |
logger = getLogger(__name__) | |
class JobArgs(TypedDict): | |
name: str | |
logging_queue: Queue[Optional[LogRecord]] | |
def example() -> None: | |
manager = multiprocessing.Manager() | |
logging_queue: Queue[Optional[LogRecord]] = manager.Queue() | |
jobs: List[JobArgs] = [ | |
JobArgs( | |
name="foo", | |
logging_queue=logging_queue, | |
), | |
... | |
] | |
with log_combiner_thread(source=logging_queue, destination=logger): | |
with multiprocessing.Pool(processes=2) as pool: | |
results = list( | |
pool.imap(do_job, jobs) | |
) | |
logger.debug("Closing multiprocess pool...") | |
pool.close() | |
logger.debug("Waiting for processes to complete...") | |
pool.join() | |
logger.debug("Multiprocess pool closed and emptied.") | |
def do_job(args: JobArgs) -> None: | |
# Create a logger that will pass its messages via a multiprocessing queue to the main process. | |
logger = Logger(name=__name__, level=DEBUG) | |
handler = QueueHandler(queue=args["logging_queue"]) | |
handler.setLevel(DEBUG) | |
logger.addHandler(handler) | |
logger.info(f"Hello from job {args['name']}!") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
from contextlib import contextmanager | |
from logging import LogRecord, Logger | |
from queue import Queue | |
from typing import Optional, Generator | |
@contextmanager | |
def log_combiner_thread( | |
*, | |
source: Queue[Optional[LogRecord]], | |
destination: Logger, | |
) -> Generator[threading.Thread, None, None]: | |
""" | |
Create a thread that consumes log messages from a multiprocessing queue and sends them to a logger. | |
See https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes | |
Args: | |
source: The queue to consume from. | |
destination: The logger to send messages to. | |
""" | |
thread = threading.Thread( | |
target=consume_logging_queue, | |
kwargs=dict( | |
source=source, | |
destination=destination, | |
), | |
) | |
try: | |
thread.start() | |
yield thread | |
finally: | |
destination.debug("Waiting for multiprocess logging thread to stop...") | |
source.put(None) # This tells the logging thread to stop. | |
thread.join() # Wait for the logging thread to stop. | |
destination.debug("Multiprocess logging thread stopped.") | |
def consume_logging_queue( | |
*, | |
source: Queue[Optional[LogRecord]], | |
destination: Logger, | |
) -> None: | |
""" | |
Consume log messages from a multiprocessing queue and send them to a logger. | |
See https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes | |
Args: | |
source: The queue to consume from. | |
destination: The logger to send messages to. | |
""" | |
while True: | |
# This blocks until a record is available. | |
record = source.get() | |
# We send `None` as a sentinel to tell this function to stop. | |
if record is None: | |
break | |
destination.handle(record) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment