Skip to content

Instantly share code, notes, and snippets.

@rudolfbyker
Created October 1, 2024 20:00
Show Gist options
  • Save rudolfbyker/9f3745ed86e8284b5e986523dd7c4a70 to your computer and use it in GitHub Desktop.
Save rudolfbyker/9f3745ed86e8284b5e986523dd7c4a70 to your computer and use it in GitHub Desktop.
Combining logs from multiple processes in Python
import multiprocessing
from logging import LogRecord, getLogger, Logger, DEBUG
from logging.handlers import QueueHandler
from queue import Queue
from typing import Optional, TypedDict, List
from .log_combiner import log_combiner_thread
logger = getLogger(__name__)
class JobArgs(TypedDict):
name: str
logging_queue: Queue[Optional[LogRecord]]
def example() -> None:
manager = multiprocessing.Manager()
logging_queue: Queue[Optional[LogRecord]] = manager.Queue()
jobs: List[JobArgs] = [
JobArgs(
name="foo",
logging_queue=logging_queue,
),
...
]
with log_combiner_thread(source=logging_queue, destination=logger):
with multiprocessing.Pool(processes=2) as pool:
results = list(
pool.imap(do_job, jobs)
)
logger.debug("Closing multiprocess pool...")
pool.close()
logger.debug("Waiting for processes to complete...")
pool.join()
logger.debug("Multiprocess pool closed and emptied.")
def do_job(args: JobArgs) -> None:
# Create a logger that will pass its messages via a multiprocessing queue to the main process.
logger = Logger(name=__name__, level=DEBUG)
handler = QueueHandler(queue=args["logging_queue"])
handler.setLevel(DEBUG)
logger.addHandler(handler)
logger.info(f"Hello from job {args['name']}!")
import threading
from contextlib import contextmanager
from logging import LogRecord, Logger
from queue import Queue
from typing import Optional, Generator
@contextmanager
def log_combiner_thread(
*,
source: Queue[Optional[LogRecord]],
destination: Logger,
) -> Generator[threading.Thread, None, None]:
"""
Create a thread that consumes log messages from a multiprocessing queue and sends them to a logger.
See https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes
Args:
source: The queue to consume from.
destination: The logger to send messages to.
"""
thread = threading.Thread(
target=consume_logging_queue,
kwargs=dict(
source=source,
destination=destination,
),
)
try:
thread.start()
yield thread
finally:
destination.debug("Waiting for multiprocess logging thread to stop...")
source.put(None) # This tells the logging thread to stop.
thread.join() # Wait for the logging thread to stop.
destination.debug("Multiprocess logging thread stopped.")
def consume_logging_queue(
*,
source: Queue[Optional[LogRecord]],
destination: Logger,
) -> None:
"""
Consume log messages from a multiprocessing queue and send them to a logger.
See https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes
Args:
source: The queue to consume from.
destination: The logger to send messages to.
"""
while True:
# This blocks until a record is available.
record = source.get()
# We send `None` as a sentinel to tell this function to stop.
if record is None:
break
destination.handle(record)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment