Last active
September 28, 2023 13:12
-
-
Save wtfzambo/190d4fb1f3b00f8f9a012de269420350 to your computer and use it in GitHub Desktop.
Easily format slack messages for https://dlthub.com/ pipelines.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
from abc import ABC, abstractmethod | |
from typing import Any, Literal, Sequence, cast, overload | |
from dlt.common.pipeline import LoadInfo | |
from dlt.common.storages.load_storage import LoadJobInfo | |
from slack_table import ColumnSpec, TRow | |
class BaseDataFormatter(ABC): | |
_COL_SPECS: Sequence[ColumnSpec] | |
_formatted_data: Sequence[TRow] | |
_global_max_width = 50 | |
_width_multiplier = 1.2 | |
def __init__(self, data: LoadInfo): | |
if not getattr(self, "_COL_SPECS", None): | |
raise NotImplementedError( | |
f"_COL_SPECS must be set in the __init__ method of {self.__class__.__name__}" | |
) | |
self.data = data | |
@abstractmethod | |
def _format_data(self) -> None: | |
pass | |
def _update_max_col_width(self) -> None: | |
for col_spec in self._COL_SPECS: | |
max_width = col_spec.get("width", 0) | |
for row in self._formatted_data: | |
data_index = col_spec["data_index"] | |
value = row[data_index] | |
max_width = min(max(max_width, len(value)), self._global_max_width) | |
col_spec["width"] = int(max_width * self._width_multiplier) | |
def format_data(self) -> tuple[Sequence[ColumnSpec], Sequence[TRow]]: | |
self._format_data() | |
self._update_max_col_width() | |
return self._COL_SPECS, self._formatted_data | |
class JobFailedDataFormatter(BaseDataFormatter): | |
def __init__(self, data: LoadInfo): | |
# fmt: off | |
self._COL_SPECS: Sequence[ColumnSpec] = [ | |
{"title": "Table", "data_index": "table_name", "base_path": "job_file_info"}, | |
{"title": "Started on", "data_index": "created_at"}, | |
{"title": "Message", "data_index": "failed_message"}, | |
] | |
# fmt: on | |
super().__init__(data) | |
self._failed_jobs = self._get_failed_jobs() | |
def _get_failed_jobs(self) -> list[LoadJobInfo]: | |
return [ | |
job | |
for package in self.data.load_packages | |
for job in package.jobs["failed_jobs"] | |
] | |
def _format_obj(self, obj: Any) -> str: | |
if isinstance(obj, datetime.datetime): | |
obj = obj.strftime("%Y-%m-%d %H:%M:%S") | |
if callable(obj): | |
obj = cast(str, obj()) | |
return str(obj).replace("\n", " ") | |
def _get_nested_attr(self, obj: Any, attr_path: str) -> str: | |
attrs = attr_path.split(".") | |
for attr in attrs: | |
obj = getattr(obj, attr, None) | |
return self._format_obj(obj) | |
def _format_data(self) -> None: | |
formatted_data = [] | |
for job in self._failed_jobs: | |
job_dict = {} | |
for col_spec in self._COL_SPECS: | |
base_path = col_spec.get("base_path") | |
data_index = col_spec["data_index"] | |
path = f"{base_path}.{data_index}" if base_path else data_index | |
job_dict[data_index] = self._get_nested_attr(job, path) | |
formatted_data.append(job_dict) | |
self._formatted_data = formatted_data | |
class SchemaChangesFormatter(BaseDataFormatter): | |
def __init__(self, data: LoadInfo): | |
self._COL_SPECS: Sequence[ColumnSpec] = [ | |
{"title": "Table", "data_index": "table_name"}, | |
{"title": "Source", "data_index": "schema_name"}, | |
{"title": "Column", "data_index": "column"}, | |
{"title": "Data type", "data_index": "data_type"}, | |
] | |
super().__init__(data) | |
def _format_data(self) -> None: | |
formatted_data = [] | |
for package in self.data.load_packages: | |
if not package.schema_update.items(): | |
continue | |
for table_name, table in package.schema_update.items(): | |
for i, (column_name, column) in enumerate(table["columns"].items()): | |
changes_dict = {} | |
changes_dict["table_name"] = table_name if i == 0 else "" | |
changes_dict["schema_name"] = package.schema_name if i == 0 else "" | |
changes_dict["column"] = column_name | |
changes_dict["data_type"] = column["data_type"] | |
formatted_data.append(changes_dict) | |
self._formatted_data = formatted_data | |
@overload | |
def get_data_formatter( | |
alert_type: Literal["failed_jobs"], data: LoadInfo | |
) -> JobFailedDataFormatter: | |
... | |
@overload | |
def get_data_formatter( | |
alert_type: Literal["schema_changes"], data: LoadInfo | |
) -> SchemaChangesFormatter: | |
... | |
def get_data_formatter( | |
alert_type: Literal["failed_jobs", "schema_changes"], data: LoadInfo | |
) -> JobFailedDataFormatter | SchemaChangesFormatter: | |
if alert_type == "failed_jobs": | |
return JobFailedDataFormatter(data) | |
elif alert_type == "schema_changes": | |
return SchemaChangesFormatter(data) | |
else: | |
raise ValueError("Unsupported alert type") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import traceback | |
from typing import Any, Callable, Sequence | |
import dlt | |
from dlt.common.pipeline import LoadInfo | |
from dlt.common.runtime.slack import send_slack_message | |
from loguru import logger | |
from formatters import get_data_formatter | |
from slack_table import slack_table | |
CI_ENVIRONMENT = os.getenv('CI_ENVIRONMENT') | |
def _check_failed_jobs(load_info: LoadInfo) -> None: | |
if not load_info.has_failed_jobs: | |
return | |
formatter = get_data_formatter("failed_jobs", load_info) | |
formatted_data = formatter.format_data() | |
slack_message = slack_table( | |
"⚠️ Warning - the following jobs have failed", | |
f"Dataset ⟶ `{load_info.pipeline.dataset_name}`", | |
*formatted_data, | |
) | |
logger.warning("\n" + slack_message.replace("```", "\n")) | |
send_slack_message( | |
load_info.pipeline.runtime_config.slack_incoming_hook, slack_message | |
) | |
def _check_schema_changes(load_info: LoadInfo) -> None: | |
if load_info.pipeline.first_run: | |
return | |
formatter = get_data_formatter("schema_changes", load_info) | |
formatted_data = formatter.format_data() | |
if not formatted_data[1]: | |
return | |
slack_message = slack_table( | |
"🧩 Info - schema changes (new columns/tables) detected", | |
f"Dataset ⟶ `{load_info.pipeline.dataset_name}`", | |
*formatted_data, | |
) | |
logger.warning("\n" + slack_message.replace("```", "\n")) | |
send_slack_message( | |
load_info.pipeline.runtime_config.slack_incoming_hook, slack_message | |
) | |
def _alert_failure(exception: Exception, traceback: str) -> None: | |
slack_message = ( | |
"<!here>\n\n" | |
+ f"*🔥 Error - pipeline crashed in `{CI_ENVIRONMENT}`*\n\n" | |
+ f"\tError message ⟶ `{repr(exception)}`\n\n" | |
+ f"```{traceback}```" | |
) | |
send_slack_message(dlt.config["runtime.slack_incoming_hook"], slack_message) | |
def pipeline_sentry(fn: Callable[[], Sequence[LoadInfo]]) -> Callable[[], None]: | |
def wrapper(*args: Any, **kwargs: Any) -> None: | |
try: | |
logger.info(f"Starting pipeline in {fn.__module__}") | |
pipeline_load_infos = fn(*args, **kwargs) | |
for load_info in pipeline_load_infos: | |
logger.success(load_info) | |
_check_failed_jobs(load_info) | |
_check_schema_changes(load_info) | |
except Exception as e: | |
logger.exception(e) | |
tb = traceback.format_exc() | |
_alert_failure(e, tb) | |
raise | |
return wrapper |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# inspired by https://github.com/idw111/slack-table | |
from typing import Literal, NotRequired, Sequence, TypedDict, cast | |
class ColumnSpec(TypedDict): | |
title: str | |
data_index: str | |
base_path: NotRequired[str] | |
width: NotRequired[int] | |
align: NotRequired[str] | |
TRow = dict[str, str] | |
def _pad_left(text: str = "", max_length: int = 13) -> str: | |
return text[:max_length].rjust(max_length) | |
def _pad_right(text: str = "", max_length: int = 13) -> str: | |
return text[:max_length].ljust(max_length) | |
def _fill_dash(length: int) -> str: | |
return "-" * length | |
def _get_lines(columns: Sequence[ColumnSpec]) -> str: | |
total_length = sum([col.get("width", 10) for col in columns]) + len(columns) - 1 | |
return _fill_dash(total_length) | |
def _get_col_value(col: ColumnSpec, row: TRow) -> str: | |
align = col.get("align", "left") | |
width = col.get("width", 10) | |
data_index = col["data_index"] | |
pad = _pad_left if align == "right" else _pad_right | |
value = str(row.get(data_index, "")) | |
return pad(value, width) | |
def _get_row(columns: Sequence[ColumnSpec], row: TRow | Literal["-"]) -> str: | |
if row == "-": | |
return _get_lines(columns) | |
return " ".join([_get_col_value(column, cast(TRow, row)) for column in columns]) | |
def _get_header_col(col: ColumnSpec) -> str: | |
align = col.get("align", "left") | |
width = col.get("width", 10) | |
title = col.get("title", "") | |
pad = _pad_left if align == "right" else _pad_right | |
return pad(title, width) | |
def _get_header_row(columns: Sequence[ColumnSpec]) -> str: | |
return " ".join([_get_header_col(column) for column in columns]) | |
# WARN: currently breaks formatting in slack if message longer than 4k characters | |
def slack_table( | |
title: str = "", | |
subtitle: str = "", | |
columns: Sequence[ColumnSpec] = [], | |
data_source: Sequence[TRow] = [], | |
) -> str: | |
table_data = ( | |
[_get_header_row(columns)] + # noqa | |
[_get_row(columns, '-')] + # noqa | |
[_get_row(columns, row) for row in data_source] | |
) | |
return ( | |
f"<!here>\n\n*{title}*\n\n" | |
+ f"\t{subtitle}\n\n```" | |
+ "\n".join(table_data) | |
+ "\n```\n" | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Usage
(Install
loguru
for no hassle out of the box pretty logging)