Created
October 23, 2023 19:05
-
-
Save bpereto/8b49893f4a19740c19bf336af94aa95b to your computer and use it in GitHub Desktop.
Litestar Sentry Integration
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import TYPE_CHECKING | |
from sentry_sdk.consts import OP | |
from sentry_sdk.hub import Hub | |
from sentry_sdk.integrations import DidNotEnable, Integration | |
from sentry_sdk.integrations.asgi import SentryAsgiMiddleware | |
from sentry_sdk.tracing import SOURCE_FOR_STYLE, TRANSACTION_SOURCE_ROUTE | |
from sentry_sdk.utils import event_from_exception, transaction_from_function | |
__all__ = ["LitestarIntegration"] | |
try: | |
from litestar import Litestar, Request # type: ignore | |
from litestar.data_extractors import ConnectionDataExtractor | |
from litestar.datastructures import State | |
from litestar.handlers.base import BaseRouteHandler # type: ignore | |
from litestar.middleware import DefineMiddleware # type: ignore | |
from litestar.routes.http import HTTPRoute # type: ignore | |
from litestar.utils import Ref, is_async_callable # type: ignore | |
if TYPE_CHECKING: | |
from typing import Any, Dict, List, Optional, Union | |
from litestar.middleware.base import MiddlewareProtocol | |
from litestar.types import ( # type: ignore | |
ASGIApp, | |
HTTPReceiveMessage, | |
HTTPScope, | |
Message, | |
Middleware, | |
Receive, | |
Scope, | |
Send, | |
WebSocketReceiveMessage, | |
) | |
from sentry_sdk._types import Event | |
except ImportError: | |
msg = "Litestar is not installed" | |
raise DidNotEnable(msg) | |
_DEFAULT_TRANSACTION_NAME = "generic Litestar request" | |
# @TODO: add to sentry-python sdk | |
OP.MIDDLEWARE_LITESTAR = "middleware.litestar" | |
OP.MIDDLEWARE_LITESTAR_RECEIVE = "middleware.litestar.receive" | |
OP.MIDDLEWARE_LITESTAR_SEND = "middleware.litestar.send" | |
class SentryLitestarASGIMiddleware(SentryAsgiMiddleware): | |
def __init__(self, app: "ASGIApp") -> None: | |
super().__init__( | |
app=app, | |
unsafe_context_data=False, | |
transaction_style="endpoint", | |
mechanism_type="asgi", | |
) | |
class LitestarIntegration(Integration): | |
identifier = "litestar" | |
@staticmethod | |
def setup_once() -> None: | |
patch_app_init() | |
patch_middlewares() | |
patch_http_route_handle() | |
def patch_app_init() -> None: | |
"""Replaces the Litestar class's `__init__` function in order to inject `after_exception` handlers and set the | |
`SentryLitestarASGIMiddleware` as the outmost middleware in the stack. | |
See: | |
- https://litestar-api.github.io/litestar/usage/0-the-litestar-app/5-application-hooks/#after-exception | |
- https://litestar-api.github.io/litestar/usage/7-middleware/0-middleware-intro/ | |
""" | |
old__init__ = Litestar.__init__ | |
def injection_wrapper(self: "Litestar", *args: "Any", **kwargs: "Any") -> None: | |
after_exception = kwargs.pop("after_exception", []) | |
kwargs.update( | |
after_exception=[ | |
exception_handler, | |
*(after_exception if isinstance(after_exception, list) else [after_exception]), | |
], | |
) | |
SentryLitestarASGIMiddleware.__call__ = SentryLitestarASGIMiddleware._run_asgi3 # type: ignore | |
middleware = kwargs.pop("middleware", None) or [] | |
kwargs["middleware"] = [SentryLitestarASGIMiddleware, *middleware] | |
old__init__(self, *args, **kwargs) | |
Litestar.__init__ = injection_wrapper | |
def patch_middlewares() -> None: | |
old__resolve_middleware_stack = BaseRouteHandler.resolve_middleware | |
def resolve_middleware_wrapper(self: "Any") -> "list[Middleware]": | |
return [enable_span_for_middleware(middleware) for middleware in old__resolve_middleware_stack(self)] | |
BaseRouteHandler.resolve_middleware = resolve_middleware_wrapper | |
def enable_span_for_middleware(middleware: "Middleware") -> "Middleware": | |
if not hasattr(middleware, "__call__") or middleware is SentryLitestarASGIMiddleware: # noqa: B004 | |
return middleware | |
if isinstance(middleware, DefineMiddleware): | |
old_call: "ASGIApp" = middleware.middleware.__call__ | |
else: | |
old_call = middleware.__call__ | |
async def _create_span_call( | |
self: "MiddlewareProtocol", | |
scope: "Scope", | |
receive: "Receive", | |
send: "Send", | |
) -> None: | |
hub = Hub.current | |
integration = hub.get_integration(LitestarIntegration) | |
if integration is not None: | |
middleware_name = self.__class__.__name__ | |
with hub.start_span( | |
op=OP.MIDDLEWARE_LITESTAR, | |
description=middleware_name, | |
) as middleware_span: | |
middleware_span.set_tag("litestar.middleware_name", middleware_name) | |
# Creating spans for the "receive" callback | |
async def _sentry_receive( | |
*args: "Any", | |
**kwargs: "Any", | |
) -> "HTTPReceiveMessage | WebSocketReceiveMessage": | |
hub = Hub.current | |
with hub.start_span( | |
op=OP.MIDDLEWARE_LITESTAR_RECEIVE, | |
description=getattr(receive, "__qualname__", str(receive)), | |
) as span: | |
span.set_tag("litestar.middleware_name", middleware_name) | |
return await receive(*args, **kwargs) | |
receive_name = getattr(receive, "__name__", str(receive)) | |
receive_patched = receive_name == "_sentry_receive" | |
new_receive = _sentry_receive if not receive_patched else receive | |
# Creating spans for the "send" callback | |
async def _sentry_send(message: "Message") -> None: | |
hub = Hub.current | |
with hub.start_span( | |
op=OP.MIDDLEWARE_LITESTAR_SEND, | |
description=getattr(send, "__qualname__", str(send)), | |
) as span: | |
span.set_tag("litestar.middleware_name", middleware_name) | |
return await send(message) | |
send_name = getattr(send, "__name__", str(send)) | |
send_patched = send_name == "_sentry_send" | |
new_send = _sentry_send if not send_patched else send | |
return await old_call(self, scope, new_receive, new_send) | |
else: | |
return await old_call(self, scope, receive, send) | |
not_yet_patched = old_call.__name__ not in ["_create_span_call"] | |
if not_yet_patched: | |
if isinstance(middleware, DefineMiddleware): | |
middleware.middleware.__call__ = _create_span_call | |
else: | |
middleware.__call__ = _create_span_call | |
return middleware | |
def patch_http_route_handle() -> None: | |
old_handle = HTTPRoute.handle | |
async def handle_wrapper( | |
self: "HTTPRoute", | |
scope: "HTTPScope", | |
receive: "Receive", | |
send: "Send", | |
) -> None: | |
hub = Hub.current | |
integration: LitestarIntegration = hub.get_integration(LitestarIntegration) | |
if integration is None: | |
return await old_handle(self, scope, receive, send) | |
with hub.configure_scope() as sentry_scope: | |
request: "Request[Any, Any]" = scope["app"].request_class( | |
scope=scope, | |
receive=receive, | |
send=send, | |
) | |
extracted_request_data = ConnectionDataExtractor( | |
parse_body=True, | |
parse_query=True, | |
)(request) | |
body = extracted_request_data.pop("body") | |
request_data = await body | |
def event_processor(event: "Event", _: "dict[str, Any]") -> "Event": | |
route_handler = scope.get("route_handler") | |
request_info = event.get("request", {}) | |
request_info["content_length"] = len(scope.get("_body", b"")) # type: ignore | |
if request_data is not None: | |
request_info["data"] = request_data | |
request_info["cookies"] = extracted_request_data["cookies"] | |
request_info["headers"] = extracted_request_data["headers"] | |
func = None | |
tx_name = None | |
if route_handler.name is not None: | |
tx_name = route_handler.name | |
elif isinstance(route_handler.fn, Ref): | |
func = route_handler.fn.value | |
else: | |
func = route_handler.fn | |
if func is not None: | |
tx_name = transaction_from_function(func) | |
tx_info = {"source": SOURCE_FOR_STYLE["endpoint"]} | |
if not tx_name: | |
tx_name = _DEFAULT_TRANSACTION_NAME | |
tx_info = {"source": TRANSACTION_SOURCE_ROUTE} | |
event.update( | |
request=request_info, | |
transaction=tx_name, | |
transaction_info=tx_info, | |
) | |
return event | |
sentry_scope._name = LitestarIntegration.identifier | |
sentry_scope.add_event_processor(event_processor) | |
return await old_handle(self, scope, receive, send) | |
HTTPRoute.handle = handle_wrapper | |
def exception_handler(exc: Exception, scope: "Scope") -> None: | |
hub = Hub.current | |
if hub.get_integration(LitestarIntegration) is None: | |
return | |
user_info: "dict[str, Any] | None" = None | |
# if _should_send_default_pii(): | |
if user_info and isinstance(user_info, dict): | |
with hub.configure_scope() as sentry_scope: | |
sentry_scope.set_user(user_info) | |
event, hint = event_from_exception( | |
exc, | |
client_options=hub.client.options if hub.client else None, | |
mechanism={"type": LitestarIntegration.identifier, "handled": False}, | |
) | |
hub.capture_event(event, hint=hint) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment