Skip to content

Instantly share code, notes, and snippets.

@sector119
Created February 20, 2024 13:01
Show Gist options
  • Save sector119/7461d181830ec1232c242e5aa8110152 to your computer and use it in GitHub Desktop.
Save sector119/7461d181830ec1232c242e5aa8110152 to your computer and use it in GitHub Desktop.
taskiq cron scheduling with structlog and sentry middlewares
import os
from pydantic import AmqpDsn
from pydantic import BaseSettings
from pydantic import HttpUrl
from pydantic import PostgresDsn
from pydantic import RedisDsn
from . import VERSION
class PostgresURI(PostgresDsn):
allowed_schemes = {
"postgresql+psycopg",
"postgresql+psycopg2",
}
class Settings(BaseSettings):
VERSION: str = VERSION
ENVIRONMENT: str = "production"
SENTRY_DSN: HttpUrl | None = None
SENTRY_TRACES_SAMPLE_RATE: float = 1.0
TASKIQ_BROKER_URL: AmqpDsn
TASKIQ_BACKEND_URL: RedisDsn
SQLALCHEMY_DATABASE_ASYNC_URI: PostgresURI
EPSILON_TASKS_LOGGER_NAME: str
class Config:
env_file = os.getenv("EPSILON_TASKS_ENV")
env_file_encoding = "utf-8"
extra = "ignore"
settings = Settings()
import logging.config
import typing
import orjson
import structlog
from structlog.types import EventDict
from structlog.types import Processor
# https://github.com/hynek/structlog/issues/35#issuecomment-591321744
def rename_event_key(_, __, event_dict: EventDict) -> EventDict:
"""
Log entries keep the text message in the `event` field, but Datadog
uses the `message` field. This processor moves the value from one field to
the other.
See https://github.com/hynek/structlog/issues/35#issuecomment-591321744
"""
event_dict["message"] = event_dict.pop("event")
return event_dict
def drop_color_message_key(_, __, event_dict: EventDict) -> EventDict:
"""
Uvicorn logs the message a second time in the extra `color_message`, but we don't
need it. This processor drops the key from the event dict if it exists.
"""
event_dict.pop("color_message", None)
return event_dict
def decode_bytes(_: typing.Any, __: typing.Any, bs: bytes) -> str:
"""
orjson returns bytes
"""
return bs.decode()
shared_processors: list[Processor] = [
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.stdlib.ExtraAdder(),
structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S", utc=False),
structlog.processors.StackInfoRenderer(),
structlog.processors.UnicodeDecoder(),
drop_color_message_key,
]
config = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"default": {
"()": structlog.stdlib.ProcessorFormatter,
"processors": [
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
structlog.dev.ConsoleRenderer(),
],
"foreign_pre_chain": shared_processors,
},
"console-plain": {
"()": structlog.stdlib.ProcessorFormatter,
"processors": [
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
structlog.dev.ConsoleRenderer(
exception_formatter=structlog.dev.plain_traceback
),
],
"foreign_pre_chain": shared_processors,
},
"json": {
"()": structlog.stdlib.ProcessorFormatter,
"processors": [
# We rename the `event` key to `message` only in JSON logs, as Datadog looks for the
# `message` key but the pretty ConsoleRenderer looks for `event`
# rename_event_key, # uncomment if ddtrace is in use
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
# structlog.processors.dict_tracebacks, # format traceback as dict
structlog.processors.format_exc_info, # format traceback as string for pretty printing
structlog.processors.JSONRenderer(serializer=orjson.dumps),
decode_bytes,
],
"foreign_pre_chain": shared_processors,
},
},
"handlers": {
"default": {
"formatter": "default",
"class": "logging.StreamHandler",
"stream": "ext://sys.stderr",
},
},
"loggers": {
"default": {
"level": "DEBUG",
"handlers": ["default"],
"propagate": False,
},
},
}
def setup_logging(
config: dict, extra_processors: list[Processor] | None = None
) -> None:
logging.config.dictConfig(config)
processors = shared_processors[:]
if extra_processors:
processors.extend(extra_processors)
processors.append(structlog.stdlib.ProcessorFormatter.wrap_for_formatter)
structlog.configure(
processors=processors,
logger_factory=structlog.stdlib.LoggerFactory(),
# use structlog.stdlib.AsyncBoundLogger for async logging
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
class BaseError(Exception):
"""Base class for all errors."""
class ImproperlyConfigured(BaseError):
"""Something is somehow improperly configured."""
import importlib
import pathlib
import typing
from datetime import timedelta
import yaml
from pydantic import BaseModel
from pydantic import validator
from taskiq.decor import AsyncTaskiqDecoratedTask
from taskiq.scheduler.scheduled_task import CronSpec
DEFAULT_TIMEZONE = "Europe/Kyiv"
def import_task(path) -> AsyncTaskiqDecoratedTask:
if "." not in path:
raise ValueError(
f"Can't import '{path}', it must be prefixed with module name"
)
module, task_name = path.rsplit(".", 1)
try:
module = importlib.import_module(module)
task = getattr(module, task_name)
if not isinstance(task, AsyncTaskiqDecoratedTask):
raise ValueError(
f"'{task_name}' must be decorated async task, got '{type(task)}' type instead"
)
except (ModuleNotFoundError, AttributeError) as e:
raise ValueError(e) from e
return task
class TaskSchedule(CronSpec):
offset: str | timedelta | None = DEFAULT_TIMEZONE
class Config:
extra = "forbid"
class Task(BaseModel):
callable: AsyncTaskiqDecoratedTask
args: list[typing.Any]
kwargs: dict[str, typing.Any]
schedule: TaskSchedule
@validator("callable", pre=True)
def _import_task(cls, v: str) -> AsyncTaskiqDecoratedTask:
return import_task(v)
class Config:
arbitrary_types_allowed = True
extra = "forbid"
class BeatSchedule(BaseModel):
__root__: list[Task]
def __iter__(self) -> typing.Iterator[Task]:
return iter(self.__root__)
def __getitem__(self, item) -> Task:
return self.__root__[item]
def get_tasks_from_config(config: pathlib.Path) -> typing.Iterator[Task]:
return iter(BeatSchedule.parse_obj(yaml.safe_load(config.read_text())))
import os
import structlog
import yaml
from structlog.types import Processor
from structlog_sentry import SentryProcessor
from epsilon.lib.logging import config
from epsilon.lib.logging import setup_logging
from .config import settings
from .exceptions import ImproperlyConfigured
if "EPSILON_TASKS_LOG_CONFIG" not in os.environ:
raise ImproperlyConfigured(
"Can't setup logging. EPSILON_TASKS_LOG_CONFIG environment variable isn't set"
)
with open(os.environ["EPSILON_TASKS_LOG_CONFIG"]) as file:
config.update(yaml.safe_load(file.read()))
extra_processors: list[Processor] | None
if settings.SENTRY_DSN:
extra_processors = [SentryProcessor()]
else:
extra_processors = None
setup_logging(config, extra_processors=extra_processors)
logger = structlog.get_logger(settings.EPSILON_TASKS_LOGGER_NAME)
import os
import pathlib
from taskiq import TaskiqScheduler
from taskiq_redis import ListQueueBroker
from taskiq_redis import RedisScheduleSource
from .config import settings
from .exceptions import ImproperlyConfigured
from .loader import get_tasks_from_config
from .logging import logger
from .middlewares import SentryMiddleware
from .middlewares import StructlogMiddleware
if "EPSILON_TASKS_CONFIG" not in os.environ:
raise ImproperlyConfigured(
"Can't setup tasks. EPSILON_TASKS_CONFIG environment variable isn't set"
)
broker = ListQueueBroker(settings.TASKIQ_BACKEND_URL)
middlewares = [
StructlogMiddleware(),
]
if settings.SENTRY_DSN:
from .sentry import setup_sentry
setup_sentry()
middlewares.append(SentryMiddleware())
broker.add_middlewares(*middlewares)
async def get_scheduler() -> TaskiqScheduler:
schedule_source = RedisScheduleSource(settings.TASKIQ_BACKEND_URL)
scheduler = TaskiqScheduler(broker, sources=[schedule_source])
tasks = get_tasks_from_config(
pathlib.Path(os.environ["EPSILON_TASKS_CONFIG"])
)
for task in tasks:
logger.debug(f"Scheduling task {task}")
await task.callable.schedule_by_cron(
schedule_source, task.schedule, *task.args, **task.kwargs
)
return scheduler
import typing
import sentry_sdk
import structlog
from taskiq import TaskiqMessage
from taskiq import TaskiqMiddleware
from taskiq import TaskiqResult
from .logging import logger
from .sentry import error_capturer
from .sentry import setup_sentry
class StructlogMiddleware(TaskiqMiddleware):
def pre_execute(
self,
message: TaskiqMessage,
) -> (
TaskiqMessage | typing.Coroutine[typing.Any, typing.Any, TaskiqMessage]
):
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
taskiq_task_id=message.task_id, taskiq_task_name=message.task_name
)
return message
class SentryMiddleware(TaskiqMiddleware):
def __init__(self):
super().__init__()
setup_sentry()
def on_error(
self,
message: TaskiqMessage,
result: TaskiqResult[typing.Any],
exception: BaseException,
) -> typing.Coroutine[typing.Any, typing.Any, None] | None:
with sentry_sdk.configure_scope():
error_capturer(exception)
logger.exception(exception)
return None
import sentry_sdk
from sentry_sdk.integrations.logging import LoggingIntegration
from .config import settings
def setup_sentry() -> None:
if not settings.SENTRY_DSN:
raise RuntimeError(
"Can't setup Sentry. SENTRY_DSN config parameter is not set"
)
sentry_sdk.init(
str(settings.SENTRY_DSN),
integrations=[
LoggingIntegration(level=None, event_level=None),
],
release=settings.VERSION,
environment=settings.ENVIRONMENT,
# Number between 0 and 1. For example, to send 20 % of transactions, set traces_sample_rate to 0.2
traces_sample_rate=settings.SENTRY_TRACES_SAMPLE_RATE,
send_default_pii=True,
)
def error_capturer(exception: BaseException) -> None:
sentry_sdk.capture_exception(exception)
from epsilon_tasks.logging import logger
from epsilon_tasks.main import broker
@broker.task("billing:sync-payments")
async def sync_payments(
*,
schema: str,
external_source: str | list[str] | None = None,
rpc_source: str | list[str] | None = None,
commit_date_start: date | None = None,
commit_date_end: date | None = None,
) -> None:
logger.info("Syncing payments...")
return None
- callable: epsilon_tasks.tasks.billing.payments:sync_payments
args: [ ]
kwargs:
schema: ifrankivsk
external_source: pret
rpc_source: abank
schedule:
minutes: "*/1"
@a1d4r
Copy link

a1d4r commented Jul 3, 2024

Thanks for sharing! I extended the taskiq middleware for sentry, you might find it useful:

class SentryMiddleware(TaskiqMiddleware):
    def pre_execute(
        self, message: TaskiqMessage
    ) -> TaskiqMessage | Coroutine[Any, Any, TaskiqMessage]:
        with sentry_sdk.configure_scope() as scope:
            scope.set_transaction_name(message.task_name)
            scope.set_tag("task", message.task_name)
            scope.set_context(
                "taskiq",
                jsonable_encoder(
                    {
                        "task_id": message.task_id,
                        "task_name": message.task_name,
                        "labels": message.labels,
                        "args": message.args,
                        "kwargs": message.kwargs,
                    }
                ),
            )
        return message

    def on_error(
        self,
        message: TaskiqMessage,  # noqa: ARG002
        result: TaskiqResult[Any],  # noqa: ARG002
        exception: BaseException,
    ) -> Coroutine[Any, Any, None] | None:
        sentry_sdk.capture_exception(exception)
        return None

It will name set the transaction name and add context information about taskiq task.

By the way, structlog has a buildin processor for renaming event:
structlog.processors.EventRenamer("message")
You can use it instead of rename_event_key

You can also add @monitor decorator to your tasks, so you can monitor jobs in "Crons" tab from Sentry.

@broker.task(task_name="my_task")
@monitor(monitor_slug="my-task")
async def my_task(...) -> None:
    ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment