-
-
Save dirkakrid/d9d6bda30287f488ab0afa73d94f6b0f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
try: | |
import Queue as queue | |
except ImportError: | |
import queue | |
import threading | |
class QueueHandler(logging.Handler): | |
""" | |
This handler sends events to a queue. Typically, it would be used together | |
with a multiprocessing Queue to centralise logging to file in one process | |
(in a multi-process application), so as to avoid file write contention | |
between processes. | |
This code is new in Python 3.2, but this class can be copy pasted into | |
user code for use with earlier Python versions. | |
""" | |
def __init__(self, queue): | |
""" | |
Initialise an instance, using the passed queue. | |
""" | |
logging.Handler.__init__(self) | |
self.queue = queue | |
def enqueue(self, record): | |
""" | |
Enqueue a record. | |
The base implementation uses put_nowait. You may want to override | |
this method if you want to use blocking, timeouts or custom queue | |
implementations. | |
""" | |
self.queue.put_nowait(record) | |
def prepare(self, record): | |
""" | |
Prepares a record for queuing. The object returned by this method is | |
enqueued. | |
The base implementation formats the record to merge the message | |
and arguments, and removes unpickleable items from the record | |
in-place. | |
You might want to override this method if you want to convert | |
the record to a dict or JSON string, or send a modified copy | |
of the record while leaving the original intact. | |
""" | |
# The format operation gets traceback text into record.exc_text | |
# (if there's exception data), and also puts the message into | |
# record.message. We can then use this to replace the original | |
# msg + args, as these might be unpickleable. We also zap the | |
# exc_info attribute, as it's no longer needed and, if not None, | |
# will typically not be pickleable. | |
self.format(record) | |
record.msg = record.message | |
record.args = None | |
record.exc_info = None | |
return record | |
def emit(self, record): | |
""" | |
Emit a record. | |
Writes the LogRecord to the queue, preparing it for pickling first. | |
""" | |
try: | |
self.enqueue(self.prepare(record)) | |
except (KeyboardInterrupt, SystemExit): | |
raise | |
except: | |
self.handleError(record) | |
class QueueListener(object): | |
""" | |
This class implements an internal threaded listener which watches for | |
LogRecords being added to a queue, removes them and passes them to a | |
list of handlers for processing. | |
""" | |
_sentinel = None | |
def __init__(self, queue, *handlers): | |
""" | |
Initialise an instance with the specified queue and | |
handlers. | |
""" | |
self.queue = queue | |
self.handlers = handlers | |
self._stop = threading.Event() | |
self._thread = None | |
def dequeue(self, block): | |
""" | |
Dequeue a record and return it, optionally blocking. | |
The base implementation uses get. You may want to override this method | |
if you want to use timeouts or work with custom queue implementations. | |
""" | |
return self.queue.get(block) | |
def start(self): | |
""" | |
Start the listener. | |
This starts up a background thread to monitor the queue for | |
LogRecords to process. | |
""" | |
self._thread = t = threading.Thread(target=self._monitor) | |
t.setDaemon(True) | |
t.start() | |
def prepare(self , record): | |
""" | |
Prepare a record for handling. | |
This method just returns the passed-in record. You may want to | |
override this method if you need to do any custom marshalling or | |
manipulation of the record before passing it to the handlers. | |
""" | |
return record | |
def handle(self, record): | |
""" | |
Handle a record. | |
This just loops through the handlers offering them the record | |
to handle. | |
""" | |
record = self.prepare(record) | |
for handler in self.handlers: | |
handler.handle(record) | |
def _monitor(self): | |
""" | |
Monitor the queue for records, and ask the handler | |
to deal with them. | |
This method runs on a separate, internal thread. | |
The thread will terminate if it sees a sentinel object in the queue. | |
""" | |
q = self.queue | |
has_task_done = hasattr(q, 'task_done') | |
while not self._stop.isSet(): | |
try: | |
record = self.dequeue(True) | |
if record is self._sentinel: | |
break | |
self.handle(record) | |
if has_task_done: | |
q.task_done() | |
except queue.Empty: | |
pass | |
# There might still be records in the queue. | |
while True: | |
try: | |
record = self.dequeue(False) | |
if record is self._sentinel: | |
break | |
self.handle(record) | |
if has_task_done: | |
q.task_done() | |
except queue.Empty: | |
break | |
def stop(self): | |
""" | |
Stop the listener. | |
This asks the thread to terminate, and then waits for it to do so. | |
Note that if you don't call this before your application exits, there | |
may be some records still left on the queue, which won't be processed. | |
""" | |
self._stop.set() | |
self.queue.put_nowait(self._sentinel) | |
self._thread.join() | |
self._thread = None | |
def main(): | |
q = queue.Queue(-1) | |
qh = QueueHandler(q) | |
h = logging.StreamHandler() | |
ql = QueueListener(q, h) | |
root = logging.getLogger() | |
root.addHandler(qh) | |
ql.start() | |
f = logging.Formatter('%(threadName)s: %(message)s') | |
h.setFormatter(f) | |
# The log output will display the thread which generated | |
# the event (the main thread) rather than the internal | |
# thread which monitors the internal queue. This is what | |
# you want to happen. | |
root.warning('Look out!') | |
ql.stop() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment