Created
March 8, 2014 17:44
-
-
Save curzona/9435729 to your computer and use it in GitHub Desktop.
Experimentation with python progress output loggers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import time | |
import logging | |
class Logger(logging.Logger): | |
indent = 0 | |
def __init__(self, name, level=logging.NOTSET): | |
logging.Logger.__init__(self, name, level) | |
self.indent = 0 | |
self.in_progress = None | |
self.in_progress_hanging = False | |
def _log(self, level, msg, args, exc_info=None, extra=None): | |
msg = ' ' * self.indent + msg | |
logging.Logger._log(self, level, msg, args, exc_info, extra) | |
def _show_progress(self): | |
return True | |
def start_progress(self, msg): | |
assert not self.in_progress, ( | |
"Tried to start_progress(%r) while in_progress %r" | |
% (msg, self.in_progress)) | |
if self._show_progress(): | |
sys.stdout.write(' ' * self.indent + msg) | |
sys.stdout.flush() | |
self.in_progress_hanging = True | |
else: | |
self.in_progress_hanging = False | |
self.in_progress = msg | |
self.last_message = None | |
def end_progress(self, msg='done.'): | |
assert self.in_progress, ( | |
"Tried to end_progress without start_progress") | |
if self._show_progress(): | |
if not self.in_progress_hanging: | |
# Some message has been printed out since start_progress | |
sys.stdout.write('...' + self.in_progress + msg + '\n') | |
sys.stdout.flush() | |
else: | |
# These erase any messages shown with show_progress (besides .'s) | |
logger.show_progress('') | |
logger.show_progress('') | |
sys.stdout.write(msg + '\n') | |
sys.stdout.flush() | |
self.in_progress = None | |
self.in_progress_hanging = False | |
def show_progress(self, msg=None): | |
"""If we are in a progress scope, and no log messages have been | |
shown, write out another '.'""" | |
if self.in_progress_hanging: | |
if msg is None: | |
sys.stdout.write('.') | |
sys.stdout.flush() | |
else: | |
if self.last_message: | |
padding = ' ' * max(0, len(self.last_message) - len(msg)) | |
else: | |
padding = '' | |
sys.stdout.write('\r%s%s%s%s' % | |
(' ' * self.indent, self.in_progress, msg, padding)) | |
sys.stdout.flush() | |
self.last_message = msg | |
class ProgressConsoleHandler(logging.StreamHandler): | |
""" | |
A handler class which allows the cursor to stay on | |
one line for selected messages | |
""" | |
on_same_line = False | |
linereturn = '\n' | |
linefeed = '\r' | |
indent = 0 | |
def emit(self, record): | |
if hasattr(record, 'indent'): | |
indent = getattr(record, 'indent') | |
if indent: | |
self.indent += 1 | |
else: | |
self.indent -= 1 | |
return | |
try: | |
msg = self.format(record) | |
stream = self.stream | |
same_line = hasattr(record, 'same_line') | |
if self.on_same_line and not same_line: | |
stream.write(self.linereturn) | |
else: | |
stream.write(self.linefeed) | |
stream.write(' ' * self.indent + msg) | |
if same_line: | |
#stream.write('... ') | |
self.on_same_line = True | |
else: | |
stream.write(self.linereturn) | |
self.on_same_line = False | |
self.flush() | |
except (KeyboardInterrupt, SystemExit): | |
raise | |
except: | |
self.handleError(record) | |
def test_progress(logger): | |
filename = 'logilab-common-0.60.0.tar.gz' | |
size = 202 | |
logger.start_progress('Downloading %s (%dkB): ' % (filename, size)) | |
for i in range(size): | |
logger.show_progress('%dkB downloaded' % i) | |
time.sleep(0.01) | |
logger.end_progress('%dkB downloaded' % size) | |
def test_indent(logger): | |
logger.info('test1') | |
logger.indent += 1 | |
logger.info('test1.1') | |
logger.info('test1.2') | |
logger.indent -= 1 | |
logger.info('test2') | |
if __name__ == '__main__': | |
progress = ProgressConsoleHandler() | |
console = logging.StreamHandler() | |
logger = Logger('test') | |
logger.setLevel(logging.DEBUG) | |
logger.addHandler(console) | |
test_progress(logger) | |
test_indent(logger) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment