Created
August 7, 2021 07:37
-
-
Save adriangb/b5ef5e0a264d3640f42ad5cf39e53cc1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from contextlib import contextmanager | |
from dataclasses import dataclass, field | |
from logging import Filter, Formatter, Handler, Logger, LogRecord | |
from logging import config as logging_config_mod | |
from logging import getLogger | |
from typing import Generator, List, Optional, Sequence, Union | |
@dataclass(frozen=True) | |
class CapturedRecord: | |
records: List[LogRecord] = field(default_factory=list) | |
output: List[str] = field(default_factory=list) | |
class _CaptureHandler(Handler): | |
def __init__(self, level: Union[str, int], *, recorder: CapturedRecord, raise_errors: bool) -> None: | |
super().__init__(level=level) | |
self.recorder = recorder | |
self.raise_errors = raise_errors | |
def emit(self, record: LogRecord) -> None: | |
try: | |
msg = self.format(record) | |
except Exception: | |
if self.raise_errors: | |
raise | |
else: | |
self.handleError(record) | |
return | |
self.recorder.records.append(record) | |
self.recorder.output.append(msg) | |
@contextmanager | |
def capture_logs( | |
logger: Optional[Union[Logger, str]] = None, | |
level: Optional[Union[int, str]] = None, | |
*, | |
config: Optional[dict] = None, | |
formatter: Optional[Formatter] = None, | |
filters: Optional[Sequence[Filter]] = None, | |
raise_errors: bool = False, | |
) -> Generator[CapturedRecord, None, None]: | |
"""Capture logs and assert that at least 1 log was emitted. | |
API compatible with unittest.TestCase.assertLogs, but offering more features: | |
- Eaiser to integrate into pytest | |
- Can attatch filters and formatters | |
- Can raise logging errors instead of swallowing them | |
Parameters | |
---------- | |
logger : Optional[Union[Logger, str]], optional | |
Logger instance or logger name, by default the root logger. | |
level : Optional[Union[int, str]], optional | |
Logging level to capture at, by default logging.INFO | |
config : Optional[dict], optional | |
A logging configuration in the dictConfig format. | |
Handlers will be ignored, only formatters and filters will be taken from the config. | |
Only a single formatter must be present, and all filters will be used. | |
If this option is used, no other formatter can be specified. | |
formatter : Optional[Formatter], optional | |
A formatter to attatch to the logging handler, by default logging.Formatter | |
filters : Optional[Sequence[Filter]], optional | |
A sequence of Filters (or API compatible callables) to apply, by default None | |
Yields | |
------- | |
CapturedRecord | |
An object that provides two attributes: | |
- `records` holds the `logging.LogRecord` instances emitted | |
- `output` holds the formatted strings | |
Raises | |
------ | |
AssertionError | |
If not logs were emitted. | |
""" | |
assert not (config is not None and formatter is not None), "You cannot use both `config` and `formatter`" | |
filters = filters or [] | |
filters = list(filters) | |
if config: | |
# Note: the logging config module uses some strange key wrapping | |
# this parsing complies with it, but be careful changing it | |
configurator = logging_config_mod.dictConfigClass(config) # type: ignore | |
filter_configs = configurator.config.get("filters", {}) | |
configured_filters = [] | |
for filter in filter_configs: | |
configured_filters.append(configurator.configure_filter(filter_configs[filter])) | |
filters.extend(configured_filters) | |
formatter_configs = configurator.config.get("formatters", {}) | |
if len(formatter_configs) > 1: | |
raise ValueError("Only 1 formatter config at a time is supported") | |
formatter_name = next(iter(formatter_configs.keys())) | |
formatter = configurator.configure_formatter(formatter_configs[formatter_name]) | |
logger = logger or "" | |
level = level if level is not None else "INFO" | |
if not isinstance(logger, Logger): | |
logger = getLogger(logger) | |
original_level = logger.level | |
logger.setLevel(level) | |
recorder = CapturedRecord() | |
handler = _CaptureHandler(level=level, recorder=recorder, raise_errors=raise_errors) | |
if formatter is not None: | |
handler.setFormatter(formatter) | |
for filter in filters or []: | |
handler.addFilter(filter) | |
logger.addHandler(handler) | |
try: | |
yield recorder | |
finally: | |
logger.setLevel(original_level) | |
logger.removeHandler(handler) | |
if len(recorder.records) == 0: | |
raise AssertionError("No logs were emitted!") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment