Last active
April 26, 2023 08:29
-
-
Save ald2004/4c2de9b651cba335596aa924284e2405 to your computer and use it in GitHub Desktop.
python logger.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import os | |
import sys | |
import time | |
import atexit | |
from termcolor import colored | |
import functools | |
from fvcore.common.file_io import PathManagerBase | |
from fvcore.common.config import CfgNode as _CfgNode | |
import functools | |
import inspect | |
from fvcore.common.config import CfgNode as _CfgNode | |
from fvcore.common.file_io import PathManager | |
import yaml | |
from logging.handlers import TimedRotatingFileHandler | |
PathManager=PathManagerBase() | |
class _ColorfulFormatter(logging.Formatter): | |
def __init__(self, *args, **kwargs): | |
self._root_name = kwargs.pop("root_name") + "." | |
self._abbrev_name = kwargs.pop("abbrev_name", "") | |
if len(self._abbrev_name): | |
self._abbrev_name = self._abbrev_name + "." | |
super(_ColorfulFormatter, self).__init__(*args, **kwargs) | |
def formatMessage(self, record): | |
record.name = record.name.replace(self._root_name, self._abbrev_name) | |
log = super(_ColorfulFormatter, self).formatMessage(record) | |
if record.levelno == logging.WARNING: | |
prefix = colored("WARNING", "red", attrs=["blink"]) | |
elif record.levelno == logging.ERROR or record.levelno == logging.CRITICAL: | |
prefix = colored("ERROR", "red", attrs=["blink", "underline"]) | |
else: | |
return log | |
return prefix + " " + log | |
# cache the opened file object, so that different calls to `setup_logger` | |
# with the same file name can safely write to the same file. | |
@functools.lru_cache(maxsize=None) | |
def _cached_log_stream(filename): | |
io = PathManager.open(filename, "a") | |
atexit.register(io.close) | |
return io | |
@functools.lru_cache() # so that calling setup_logger multiple times won't add many handlers | |
def setup_logger( | |
output=None, distributed_rank=0, *, color=True, name="detectron2", abbrev_name=None | |
): | |
""" | |
Initialize the detectron2 logger and set its verbosity level to "DEBUG". | |
Args: | |
output (str): a file name or a directory to save log. If None, will not save log file. | |
If ends with ".txt" or ".log", assumed to be a file name. | |
Otherwise, logs will be saved to `output/log.txt`. | |
name (str): the root module name of this logger | |
abbrev_name (str): an abbreviation of the module, to avoid long names in logs. | |
Set to "" to not log the root module in logs. | |
By default, will abbreviate "detectron2" to "d2" and leave other | |
modules unchanged. | |
Returns: | |
logging.Logger: a logger | |
""" | |
logger = logging.getLogger(name) | |
logger.setLevel(logging.DEBUG) | |
logger.propagate = False | |
if abbrev_name is None: | |
abbrev_name = "d2" if name == "detectron2" else name | |
plain_formatter = logging.Formatter( | |
"[%(asctime)s] %(name)s %(module)s %(lineno)d %(levelname)s: %(message)s", datefmt="%m/%d %H:%M:%S" | |
) | |
# stdout logging: master only | |
if distributed_rank == 0: | |
ch = logging.StreamHandler(stream=sys.stdout) | |
ch.setLevel(logging.DEBUG) | |
if color: | |
formatter = _ColorfulFormatter( | |
colored("[%(asctime)s %(name)s %(filename)s %(module)s %(lineno)d]: ", "green") + "%(message)s", | |
datefmt="%m/%d %H:%M:%S", | |
root_name=name, | |
abbrev_name=str(abbrev_name), | |
) | |
else: | |
formatter = plain_formatter | |
ch.setFormatter(formatter) | |
logger.addHandler(ch) | |
# file logging: all workers | |
if output is not None: | |
if output.endswith(".txt") or output.endswith(".log"): | |
filename = output | |
else: | |
filename = os.path.join(output, "log.txt") | |
if distributed_rank > 0: | |
filename = filename + ".rank{}".format(distributed_rank) | |
try: | |
PathManager.mkdirs(os.path.dirname(filename)) | |
except: | |
PathManager.mkdirs("logs") | |
fh = logging.handlers.TimedRotatingFileHandler(filename=filename, when='MIDNIGHT', interval=1, backupCount=14) | |
fh.setLevel(logging.DEBUG) | |
fh.setFormatter(plain_formatter) | |
logger.addHandler(fh) | |
return logger | |
_C = _CfgNode() | |
def get_cfg(filename: str, allow_unsafe: bool = False): | |
cfg = _C.clone() | |
try: | |
with open(filename,'r') as fid: | |
for k,v in yaml.safe_load(fid).items(): | |
if not hasattr(cfg, k): | |
setattr(cfg, k, v) | |
except: | |
pass | |
cfg.merge_from_file(filename, allow_unsafe=allow_unsafe) | |
return cfg | |
logger = setup_logger(name='cq_soil') | |
def get_logger(): | |
return logger | |
class CfgNode(_CfgNode): | |
""" | |
The same as `fvcore.common.config.CfgNode`, but different in: | |
1. Use unsafe yaml loading by default. | |
Note that this may lead to arbitrary code execution: you must not | |
load a config file from untrusted sources before manually inspecting | |
the content of the file. | |
2. Support config versioning. | |
When attempting to merge an old config, it will convert the old config automatically. | |
""" | |
@classmethod | |
def _open_cfg(cls, filename): | |
return PathManager.open(filename, "r") | |
# Note that the default value of allow_unsafe is changed to True | |
def merge_from_file(self, cfg_filename: str, allow_unsafe: bool = True) -> None: | |
assert PathManager.isfile(cfg_filename), f"Config file '{cfg_filename}' does not exist!" | |
loaded_cfg = self.load_yaml_with_base(cfg_filename, allow_unsafe=allow_unsafe) | |
loaded_cfg = type(self)(loaded_cfg) | |
# defaults.py needs to import CfgNode | |
latest_ver = _C.VERSION | |
assert ( | |
latest_ver == self.VERSION | |
), "CfgNode.merge_from_file is only allowed on a config object of latest version!" | |
# logger = logging.getLogger(__name__) | |
logger = setup_logger(name='person_track') | |
loaded_ver = loaded_cfg.get("VERSION", None) | |
logger.debug(f"loaded_ver is: {loaded_ver}") | |
assert loaded_ver == self.VERSION, "Cannot merge a v{} config into a v{} config.".format( | |
loaded_ver, self.VERSION | |
) | |
if loaded_ver == self.VERSION: | |
self.merge_from_other_cfg(loaded_cfg) | |
def dump(self, *args, **kwargs): | |
""" | |
Returns: | |
str: a yaml string representation of the config | |
""" | |
# to make it show up in docs | |
return super().dump(*args, **kwargs) | |
def configurable(init_func=None, *, from_config=None): | |
""" | |
Decorate a function or a class's __init__ method so that it can be called | |
with a :class:`CfgNode` object using a :func:`from_config` function that translates | |
:class:`CfgNode` to arguments. | |
Examples: | |
:: | |
# Usage 1: Decorator on __init__: | |
class A: | |
@configurable | |
def __init__(self, a, b=2, c=3): | |
pass | |
@classmethod | |
def from_config(cls, cfg): # 'cfg' must be the first argument | |
# Returns kwargs to be passed to __init__ | |
return {"a": cfg.A, "b": cfg.B} | |
a1 = A(a=1, b=2) # regular construction | |
a2 = A(cfg) # construct with a cfg | |
a3 = A(cfg, b=3, c=4) # construct with extra overwrite | |
# Usage 2: Decorator on any function. Needs an extra from_config argument: | |
@configurable(from_config=lambda cfg: {"a: cfg.A, "b": cfg.B}) | |
def a_func(a, b=2, c=3): | |
pass | |
a1 = a_func(a=1, b=2) # regular call | |
a2 = a_func(cfg) # call with a cfg | |
a3 = a_func(cfg, b=3, c=4) # call with extra overwrite | |
Args: | |
init_func (callable): a class's ``__init__`` method in usage 1. The | |
class must have a ``from_config`` classmethod which takes `cfg` as | |
the first argument. | |
from_config (callable): the from_config function in usage 2. It must take `cfg` | |
as its first argument. | |
""" | |
def check_docstring(func): | |
if func.__module__.startswith("detectron2."): | |
assert ( | |
func.__doc__ is not None and "experimental" in func.__doc__.lower() | |
), f"configurable {func} should be marked experimental" | |
if init_func is not None: | |
assert ( | |
inspect.isfunction(init_func) | |
and from_config is None | |
and init_func.__name__ == "__init__" | |
), "Incorrect use of @configurable. Check API documentation for examples." | |
check_docstring(init_func) | |
@functools.wraps(init_func) | |
def wrapped(self, *args, **kwargs): | |
try: | |
from_config_func = type(self).from_config | |
except AttributeError as e: | |
raise AttributeError( | |
"Class with @configurable must have a 'from_config' classmethod." | |
) from e | |
if not inspect.ismethod(from_config_func): | |
raise TypeError("Class with @configurable must have a 'from_config' classmethod.") | |
if _called_with_cfg(*args, **kwargs): | |
explicit_args = _get_args_from_config(from_config_func, *args, **kwargs) | |
init_func(self, **explicit_args) | |
else: | |
init_func(self, *args, **kwargs) | |
return wrapped | |
else: | |
if from_config is None: | |
return configurable # @configurable() is made equivalent to @configurable | |
assert inspect.isfunction( | |
from_config | |
), "from_config argument of configurable must be a function!" | |
def wrapper(orig_func): | |
check_docstring(orig_func) | |
@functools.wraps(orig_func) | |
def wrapped(*args, **kwargs): | |
if _called_with_cfg(*args, **kwargs): | |
explicit_args = _get_args_from_config(from_config, *args, **kwargs) | |
return orig_func(**explicit_args) | |
else: | |
return orig_func(*args, **kwargs) | |
return wrapped | |
return wrapper | |
def _get_args_from_config(from_config_func, *args, **kwargs): | |
""" | |
Use `from_config` to obtain explicit arguments. | |
Returns: | |
dict: arguments to be used for cls.__init__ | |
""" | |
signature = inspect.signature(from_config_func) | |
if list(signature.parameters.keys())[0] != "cfg": | |
if inspect.isfunction(from_config_func): | |
name = from_config_func.__name__ | |
else: | |
name = f"{from_config_func.__self__}.from_config" | |
raise TypeError(f"{name} must take 'cfg' as the first argument!") | |
support_var_arg = any( | |
param.kind in [param.VAR_POSITIONAL, param.VAR_KEYWORD] | |
for param in signature.parameters.values() | |
) | |
if support_var_arg: # forward all arguments to from_config, if from_config accepts them | |
ret = from_config_func(*args, **kwargs) | |
else: | |
# forward supported arguments to from_config | |
supported_arg_names = set(signature.parameters.keys()) | |
extra_kwargs = {} | |
for name in list(kwargs.keys()): | |
if name not in supported_arg_names: | |
extra_kwargs[name] = kwargs.pop(name) | |
ret = from_config_func(*args, **kwargs) | |
# forward the other arguments to __init__ | |
ret.update(extra_kwargs) | |
return ret | |
def _called_with_cfg(*args, **kwargs): | |
""" | |
Returns: | |
bool: whether the arguments contain CfgNode and should be considered | |
forwarded to from_config. | |
""" | |
if len(args) and isinstance(args[0], _CfgNode): | |
return True | |
if isinstance(kwargs.pop("cfg", None), _CfgNode): | |
return True | |
# `from_config`'s first argument is forced to be "cfg". | |
# So the above check covers all cases. | |
return False | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment