Last active
January 25, 2024 11:29
-
-
Save krets/a7466d0a5cce4de7fc6a5773c660801b to your computer and use it in GitHub Desktop.
Multi-use timer object
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import logging | |
class Timer: | |
""" | |
A Timer helper | |
This object can be used as an object, context or decorator. | |
Messages can be collected via logging by using the report() method. | |
## direct object | |
``` | |
timer = Timer('Inline process')) | |
# Some long running code | |
timer.stop() | |
``` | |
## Context Manager | |
``` | |
with Timer() as timer: | |
long_process() | |
timer.mark('First phase done') | |
some_other_task() | |
timer.mark('Phase two complete') | |
``` | |
## Decorator | |
``` | |
@Timer() | |
def my_fucntion(): | |
#... | |
``` | |
""" | |
_start = "START" | |
_stop = "STOP" | |
_format = "%s - %0.2fs" | |
_format_extended = _format + " (total: %0.2fs)" | |
_logger_prefix = 'krets.Timer' | |
_log_level = logging.DEBUG | |
def __init__(self, name=None, log=None, log_level=None): | |
self.name = str(id(self)) if name is None else name | |
self.log = self._get_logger() if log is None else log | |
self._log_level = self._log_level if log_level is None else log_level | |
self._events = [] | |
self.reset() | |
def is_running(self): | |
return not self._events[-1][1] == self._stop | |
def mark(self, message): | |
""" Get split time and log event """ | |
now, split_duration, full_duration = self.split() | |
event = (now, message) | |
self._events.append(event) | |
args = [message, split_duration] | |
format = self._format | |
if len(self._events) > 2: | |
args.append(full_duration) | |
format = self._format_extended | |
self.log.log(self._log_level, format, *args) | |
def split(self): | |
""" Get time since last mark and start """ | |
now = time.monotonic() | |
full_duration = now - self._events[0][0] | |
split_duration = now - self._events[-1][0] | |
return now, split_duration, full_duration | |
def reset(self): | |
self._events.clear() | |
self._events.append((time.monotonic(), self._start)) | |
@property | |
def total_seconds(self): | |
if not self.is_running(): | |
return self._events[-1][0] - self._events[0][0] | |
return self.split()[3] | |
def stop(self): | |
self.mark(self._stop) | |
def report(self): | |
rows = ["## {}".format(self)] | |
if len(self._events) > 1: | |
max_event_length = max(len(event[1]) for event in self._events[1:]) | |
rows.append("| {:<{}} | {:<8} | {:<15} |".format("Event", max_event_length, "Duration", "Total Duration")) | |
rows.append("|{}|{}|{}|".format("-" * (max_event_length + 2), "----------", "-----------------")) | |
for i in range(1, len(self._events)): | |
timestamp, event_name = self._events[i] | |
total_duration = timestamp - self._events[0][0] | |
duration = timestamp - self._events[i-1][0] | |
rows.append("| {:<{}} | {:<8.3f} | {:<15.3f} |".format(event_name, max_event_length, duration, total_duration)) | |
return "\n".join(rows) | |
def _get_logger(self): | |
return logging.getLogger('.'.join([self._logger_prefix, self.name])) | |
def __enter__(self): | |
self.reset() | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
self.stop() | |
def __call__(self, func): | |
self.name = func.__name__ | |
self.log = self._get_logger() | |
def wrapped(*args, **kwargs): | |
self.reset() | |
result = func(*args, **kwargs) | |
self.mark('Complete') | |
return result | |
return wrapped | |
def __repr__(self): | |
return "<%s.%s %0.2fs%s>" % ( | |
self.__class__.__name__, | |
self.name, self.total_seconds, | |
(' running' if self.is_running() else '')) | |
@Timer() | |
def hard_work(): | |
time.sleep(4) | |
if __name__=='__main__': | |
log = logging.getLogger(Timer._logger_prefix) | |
log.addHandler(logging.StreamHandler()) | |
log.handlers[-1].setFormatter(logging.Formatter('%(name)s - %(levelname)s - %(message)s')) | |
log.setLevel(logging.DEBUG) | |
with Timer('') as timer: | |
time.sleep(.5) | |
timer.mark("First sleep") | |
time.sleep(.75) | |
timer.mark("Second sleep") | |
print(timer.report()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment