Created
February 20, 2024 13:01
-
-
Save sector119/7461d181830ec1232c242e5aa8110152 to your computer and use it in GitHub Desktop.
taskiq cron scheduling with structlog and sentry middlewares
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from pydantic import AmqpDsn | |
from pydantic import BaseSettings | |
from pydantic import HttpUrl | |
from pydantic import PostgresDsn | |
from pydantic import RedisDsn | |
from . import VERSION | |
class PostgresURI(PostgresDsn): | |
allowed_schemes = { | |
"postgresql+psycopg", | |
"postgresql+psycopg2", | |
} | |
class Settings(BaseSettings): | |
VERSION: str = VERSION | |
ENVIRONMENT: str = "production" | |
SENTRY_DSN: HttpUrl | None = None | |
SENTRY_TRACES_SAMPLE_RATE: float = 1.0 | |
TASKIQ_BROKER_URL: AmqpDsn | |
TASKIQ_BACKEND_URL: RedisDsn | |
SQLALCHEMY_DATABASE_ASYNC_URI: PostgresURI | |
EPSILON_TASKS_LOGGER_NAME: str | |
class Config: | |
env_file = os.getenv("EPSILON_TASKS_ENV") | |
env_file_encoding = "utf-8" | |
extra = "ignore" | |
settings = Settings() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging.config | |
import typing | |
import orjson | |
import structlog | |
from structlog.types import EventDict | |
from structlog.types import Processor | |
# https://github.com/hynek/structlog/issues/35#issuecomment-591321744 | |
def rename_event_key(_, __, event_dict: EventDict) -> EventDict: | |
""" | |
Log entries keep the text message in the `event` field, but Datadog | |
uses the `message` field. This processor moves the value from one field to | |
the other. | |
See https://github.com/hynek/structlog/issues/35#issuecomment-591321744 | |
""" | |
event_dict["message"] = event_dict.pop("event") | |
return event_dict | |
def drop_color_message_key(_, __, event_dict: EventDict) -> EventDict: | |
""" | |
Uvicorn logs the message a second time in the extra `color_message`, but we don't | |
need it. This processor drops the key from the event dict if it exists. | |
""" | |
event_dict.pop("color_message", None) | |
return event_dict | |
def decode_bytes(_: typing.Any, __: typing.Any, bs: bytes) -> str: | |
""" | |
orjson returns bytes | |
""" | |
return bs.decode() | |
shared_processors: list[Processor] = [ | |
structlog.contextvars.merge_contextvars, | |
structlog.stdlib.add_logger_name, | |
structlog.stdlib.add_log_level, | |
structlog.stdlib.PositionalArgumentsFormatter(), | |
structlog.stdlib.ExtraAdder(), | |
structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S", utc=False), | |
structlog.processors.StackInfoRenderer(), | |
structlog.processors.UnicodeDecoder(), | |
drop_color_message_key, | |
] | |
config = { | |
"version": 1, | |
"disable_existing_loggers": False, | |
"formatters": { | |
"default": { | |
"()": structlog.stdlib.ProcessorFormatter, | |
"processors": [ | |
structlog.stdlib.ProcessorFormatter.remove_processors_meta, | |
structlog.dev.ConsoleRenderer(), | |
], | |
"foreign_pre_chain": shared_processors, | |
}, | |
"console-plain": { | |
"()": structlog.stdlib.ProcessorFormatter, | |
"processors": [ | |
structlog.stdlib.ProcessorFormatter.remove_processors_meta, | |
structlog.dev.ConsoleRenderer( | |
exception_formatter=structlog.dev.plain_traceback | |
), | |
], | |
"foreign_pre_chain": shared_processors, | |
}, | |
"json": { | |
"()": structlog.stdlib.ProcessorFormatter, | |
"processors": [ | |
# We rename the `event` key to `message` only in JSON logs, as Datadog looks for the | |
# `message` key but the pretty ConsoleRenderer looks for `event` | |
# rename_event_key, # uncomment if ddtrace is in use | |
structlog.stdlib.ProcessorFormatter.remove_processors_meta, | |
# structlog.processors.dict_tracebacks, # format traceback as dict | |
structlog.processors.format_exc_info, # format traceback as string for pretty printing | |
structlog.processors.JSONRenderer(serializer=orjson.dumps), | |
decode_bytes, | |
], | |
"foreign_pre_chain": shared_processors, | |
}, | |
}, | |
"handlers": { | |
"default": { | |
"formatter": "default", | |
"class": "logging.StreamHandler", | |
"stream": "ext://sys.stderr", | |
}, | |
}, | |
"loggers": { | |
"default": { | |
"level": "DEBUG", | |
"handlers": ["default"], | |
"propagate": False, | |
}, | |
}, | |
} | |
def setup_logging( | |
config: dict, extra_processors: list[Processor] | None = None | |
) -> None: | |
logging.config.dictConfig(config) | |
processors = shared_processors[:] | |
if extra_processors: | |
processors.extend(extra_processors) | |
processors.append(structlog.stdlib.ProcessorFormatter.wrap_for_formatter) | |
structlog.configure( | |
processors=processors, | |
logger_factory=structlog.stdlib.LoggerFactory(), | |
# use structlog.stdlib.AsyncBoundLogger for async logging | |
wrapper_class=structlog.stdlib.BoundLogger, | |
cache_logger_on_first_use=True, | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class BaseError(Exception): | |
"""Base class for all errors.""" | |
class ImproperlyConfigured(BaseError): | |
"""Something is somehow improperly configured.""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import importlib | |
import pathlib | |
import typing | |
from datetime import timedelta | |
import yaml | |
from pydantic import BaseModel | |
from pydantic import validator | |
from taskiq.decor import AsyncTaskiqDecoratedTask | |
from taskiq.scheduler.scheduled_task import CronSpec | |
DEFAULT_TIMEZONE = "Europe/Kyiv" | |
def import_task(path) -> AsyncTaskiqDecoratedTask: | |
if "." not in path: | |
raise ValueError( | |
f"Can't import '{path}', it must be prefixed with module name" | |
) | |
module, task_name = path.rsplit(".", 1) | |
try: | |
module = importlib.import_module(module) | |
task = getattr(module, task_name) | |
if not isinstance(task, AsyncTaskiqDecoratedTask): | |
raise ValueError( | |
f"'{task_name}' must be decorated async task, got '{type(task)}' type instead" | |
) | |
except (ModuleNotFoundError, AttributeError) as e: | |
raise ValueError(e) from e | |
return task | |
class TaskSchedule(CronSpec): | |
offset: str | timedelta | None = DEFAULT_TIMEZONE | |
class Config: | |
extra = "forbid" | |
class Task(BaseModel): | |
callable: AsyncTaskiqDecoratedTask | |
args: list[typing.Any] | |
kwargs: dict[str, typing.Any] | |
schedule: TaskSchedule | |
@validator("callable", pre=True) | |
def _import_task(cls, v: str) -> AsyncTaskiqDecoratedTask: | |
return import_task(v) | |
class Config: | |
arbitrary_types_allowed = True | |
extra = "forbid" | |
class BeatSchedule(BaseModel): | |
__root__: list[Task] | |
def __iter__(self) -> typing.Iterator[Task]: | |
return iter(self.__root__) | |
def __getitem__(self, item) -> Task: | |
return self.__root__[item] | |
def get_tasks_from_config(config: pathlib.Path) -> typing.Iterator[Task]: | |
return iter(BeatSchedule.parse_obj(yaml.safe_load(config.read_text()))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import structlog | |
import yaml | |
from structlog.types import Processor | |
from structlog_sentry import SentryProcessor | |
from epsilon.lib.logging import config | |
from epsilon.lib.logging import setup_logging | |
from .config import settings | |
from .exceptions import ImproperlyConfigured | |
if "EPSILON_TASKS_LOG_CONFIG" not in os.environ: | |
raise ImproperlyConfigured( | |
"Can't setup logging. EPSILON_TASKS_LOG_CONFIG environment variable isn't set" | |
) | |
with open(os.environ["EPSILON_TASKS_LOG_CONFIG"]) as file: | |
config.update(yaml.safe_load(file.read())) | |
extra_processors: list[Processor] | None | |
if settings.SENTRY_DSN: | |
extra_processors = [SentryProcessor()] | |
else: | |
extra_processors = None | |
setup_logging(config, extra_processors=extra_processors) | |
logger = structlog.get_logger(settings.EPSILON_TASKS_LOGGER_NAME) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import pathlib | |
from taskiq import TaskiqScheduler | |
from taskiq_redis import ListQueueBroker | |
from taskiq_redis import RedisScheduleSource | |
from .config import settings | |
from .exceptions import ImproperlyConfigured | |
from .loader import get_tasks_from_config | |
from .logging import logger | |
from .middlewares import SentryMiddleware | |
from .middlewares import StructlogMiddleware | |
if "EPSILON_TASKS_CONFIG" not in os.environ: | |
raise ImproperlyConfigured( | |
"Can't setup tasks. EPSILON_TASKS_CONFIG environment variable isn't set" | |
) | |
broker = ListQueueBroker(settings.TASKIQ_BACKEND_URL) | |
middlewares = [ | |
StructlogMiddleware(), | |
] | |
if settings.SENTRY_DSN: | |
from .sentry import setup_sentry | |
setup_sentry() | |
middlewares.append(SentryMiddleware()) | |
broker.add_middlewares(*middlewares) | |
async def get_scheduler() -> TaskiqScheduler: | |
schedule_source = RedisScheduleSource(settings.TASKIQ_BACKEND_URL) | |
scheduler = TaskiqScheduler(broker, sources=[schedule_source]) | |
tasks = get_tasks_from_config( | |
pathlib.Path(os.environ["EPSILON_TASKS_CONFIG"]) | |
) | |
for task in tasks: | |
logger.debug(f"Scheduling task {task}") | |
await task.callable.schedule_by_cron( | |
schedule_source, task.schedule, *task.args, **task.kwargs | |
) | |
return scheduler |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import typing | |
import sentry_sdk | |
import structlog | |
from taskiq import TaskiqMessage | |
from taskiq import TaskiqMiddleware | |
from taskiq import TaskiqResult | |
from .logging import logger | |
from .sentry import error_capturer | |
from .sentry import setup_sentry | |
class StructlogMiddleware(TaskiqMiddleware): | |
def pre_execute( | |
self, | |
message: TaskiqMessage, | |
) -> ( | |
TaskiqMessage | typing.Coroutine[typing.Any, typing.Any, TaskiqMessage] | |
): | |
structlog.contextvars.clear_contextvars() | |
structlog.contextvars.bind_contextvars( | |
taskiq_task_id=message.task_id, taskiq_task_name=message.task_name | |
) | |
return message | |
class SentryMiddleware(TaskiqMiddleware): | |
def __init__(self): | |
super().__init__() | |
setup_sentry() | |
def on_error( | |
self, | |
message: TaskiqMessage, | |
result: TaskiqResult[typing.Any], | |
exception: BaseException, | |
) -> typing.Coroutine[typing.Any, typing.Any, None] | None: | |
with sentry_sdk.configure_scope(): | |
error_capturer(exception) | |
logger.exception(exception) | |
return None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sentry_sdk | |
from sentry_sdk.integrations.logging import LoggingIntegration | |
from .config import settings | |
def setup_sentry() -> None: | |
if not settings.SENTRY_DSN: | |
raise RuntimeError( | |
"Can't setup Sentry. SENTRY_DSN config parameter is not set" | |
) | |
sentry_sdk.init( | |
str(settings.SENTRY_DSN), | |
integrations=[ | |
LoggingIntegration(level=None, event_level=None), | |
], | |
release=settings.VERSION, | |
environment=settings.ENVIRONMENT, | |
# Number between 0 and 1. For example, to send 20 % of transactions, set traces_sample_rate to 0.2 | |
traces_sample_rate=settings.SENTRY_TRACES_SAMPLE_RATE, | |
send_default_pii=True, | |
) | |
def error_capturer(exception: BaseException) -> None: | |
sentry_sdk.capture_exception(exception) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from epsilon_tasks.logging import logger | |
from epsilon_tasks.main import broker | |
@broker.task("billing:sync-payments") | |
async def sync_payments( | |
*, | |
schema: str, | |
external_source: str | list[str] | None = None, | |
rpc_source: str | list[str] | None = None, | |
commit_date_start: date | None = None, | |
commit_date_end: date | None = None, | |
) -> None: | |
logger.info("Syncing payments...") | |
return None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
- callable: epsilon_tasks.tasks.billing.payments:sync_payments | |
args: [ ] | |
kwargs: | |
schema: ifrankivsk | |
external_source: pret | |
rpc_source: abank | |
schedule: | |
minutes: "*/1" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for sharing! I extended the taskiq middleware for sentry, you might find it useful:
It will name set the transaction name and add context information about taskiq task.
By the way, structlog has a buildin processor for renaming event:
structlog.processors.EventRenamer("message")
You can use it instead of
rename_event_key
You can also add
@monitor
decorator to your tasks, so you can monitor jobs in "Crons" tab from Sentry.