Created
January 22, 2025 10:23
-
-
Save TolstochenkoDaniil/373aa8e456a2223745e7bb98196c41a5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Following gist is the example of sentry integration for | |
# faststream app (https://github.com/airtai/faststream) and only kafka broker as the use case | |
# Feel free to contact if you have any suggestions or help for refactoring and/or scale the integration | |
from typing import Any, Optional | |
from datetime import UTC, datetime | |
import sys | |
from faststream.broker.core.usecase import BrokerUsecase | |
from faststream.broker.publisher.proto import ProducerProto | |
from faststream.broker.subscriber.call_item import HandlerItem | |
from faststream.broker.subscriber.usecase import SubscriberUsecase | |
from faststream.kafka.publisher.usecase import DefaultPublisher | |
from faststream.broker.response import ensure_response | |
from faststream.exceptions import IgnoredException, HandlerException | |
from sentry_sdk import ( | |
Scope, | |
continue_trace, | |
get_current_scope, | |
isolation_scope, | |
start_span, | |
start_transaction, | |
) | |
from sentry_sdk.consts import OP, SPANSTATUS | |
from sentry_sdk.integrations import Integration | |
from sentry_sdk.scope import should_send_default_pii | |
from sentry_sdk.tracing import ( | |
TRANSACTION_SOURCE_TASK, | |
BAGGAGE_HEADER_NAME, | |
) | |
from faststream.kafka.message import KafkaMessage | |
from sentry_sdk.utils import ( | |
SENSITIVE_DATA_SUBSTITUTE, | |
capture_internal_exceptions, | |
ensure_integration_enabled, | |
event_from_exception, | |
reraise, | |
) | |
from typing import TYPE_CHECKING | |
if TYPE_CHECKING: | |
from sentry_sdk._types import ( | |
Event, | |
Hint, | |
ExcInfo, | |
) | |
IGNORED_EXCEPTIONS = {} | |
FASTSTREAM_FLOW_EXCEPTIONS = (IgnoredException, HandlerException) | |
class FastStreamKafkaIntegration(Integration): | |
identifier = "faststream.kafka" | |
origin = f"auto.queue.{identifier}" | |
@staticmethod | |
def setup_once(): | |
# type: () -> None | |
patch_broker_publish() | |
patch_publisher() | |
patch_consume() | |
def set_trace_headers(headers: dict) -> dict: | |
for ( | |
key, | |
value, | |
) in get_current_scope().iter_trace_propagation_headers(): | |
if key == BAGGAGE_HEADER_NAME and headers.get(BAGGAGE_HEADER_NAME): | |
headers[key] += "," + value | |
else: | |
headers[key] = value | |
return headers | |
def _capture_faststream_exception(exc_info): | |
# type: (ExcInfo) -> None | |
scope = Scope.get_current_scope() | |
if exc_info[0] in FASTSTREAM_FLOW_EXCEPTIONS: | |
scope.transaction.set_status(SPANSTATUS.ABORTED) | |
return | |
scope.transaction.set_status(SPANSTATUS.INTERNAL_ERROR) | |
event, hint = event_from_exception( | |
exc_info, | |
client_options=Scope.get_client().options, | |
mechanism={"type": FastStreamKafkaIntegration.identifier, "handled": False}, | |
) | |
scope.capture_event(event, hint=hint) | |
def patch_broker_publish(): | |
# type: () -> None | |
old_publish = BrokerUsecase.publish | |
@ensure_integration_enabled(FastStreamKafkaIntegration, old_publish) | |
async def _sentry_publish( | |
self: BrokerUsecase, | |
msg: Any, | |
*, | |
producer: Optional[ProducerProto], | |
**kwargs, | |
) -> Optional[Any]: | |
with start_span( | |
op=OP.QUEUE_PUBLISH, | |
name=f"{producer.__class__.__name__}.{kwargs['topic']}", | |
origin=FastStreamKafkaIntegration.origin, | |
) as span: | |
span.set_data("messaging.message.correlation_id", kwargs["correlation_id"]) | |
span.set_data("messaging.destination.name", kwargs["topic"]) | |
kwargs["headers"] = kwargs.get("headers", {}) or {} | |
set_trace_headers(kwargs["headers"]) | |
try: | |
result = await old_publish(self, msg, producer=producer, **kwargs) | |
span.set_status(SPANSTATUS.OK) | |
return result | |
except Exception: | |
span.set_status(SPANSTATUS.INTERNAL_ERROR) | |
reraise(*sys.exc_info()) | |
BrokerUsecase.publish = _sentry_publish # type: ignore[method-assign] | |
def patch_publisher(): | |
# type: () -> None | |
old_publish = DefaultPublisher.publish | |
@ensure_integration_enabled(FastStreamKafkaIntegration, old_publish) # type: ignore[arg-type] | |
async def _sentry_publish( | |
self: DefaultPublisher, | |
msg: Any, | |
topic: str = "", | |
**kwargs, | |
) -> Optional[Any]: | |
with start_span( | |
op=OP.QUEUE_PUBLISH, | |
name=f"{self._producer.__class__.__name__}.{self.topic}", | |
origin=FastStreamKafkaIntegration.origin, | |
) as span: | |
span.set_data("messaging.message.correlation_id", kwargs["correlation_id"]) | |
span.set_data("messaging.destination.name", self.topic) | |
kwargs["headers"] = kwargs.get("headers", {}) or {} | |
set_trace_headers(kwargs["headers"]) | |
try: | |
result = await old_publish(self, msg, topic=topic, **kwargs) | |
span.set_status(SPANSTATUS.OK) | |
return result | |
except Exception: | |
span.set_status(SPANSTATUS.INTERNAL_ERROR) | |
reraise(*sys.exc_info()) | |
DefaultPublisher.publish = _sentry_publish # type: ignore[method-assign] | |
def _make_event_processor(msg: KafkaMessage): | |
def event_processor(event, hint): | |
# type: (Event, Hint) -> Optional[Event] | |
with capture_internal_exceptions(): | |
tags = event.setdefault("tags", {}) | |
tags["topic"] = msg.raw_message.topic # type: ignore[union-attr] | |
tags["event_message_id"] = msg.message_id | |
tags["correlation_id"] = msg.correlation_id | |
extra = event.setdefault("extra", {}) | |
extra["event-args"] = { | |
"message_id": msg.message_id, | |
"correlation_id": msg.correlation_id, | |
"headers": msg.headers, | |
"topic": msg.raw_message.topic, # type: ignore[union-attr] | |
"content_type": msg.content_type, | |
"partition": msg.raw_message.partition, # type: ignore[union-attr] | |
"offset": msg.raw_message.offset, # type: ignore[union-attr] | |
"value": ( | |
msg.decoded_body | |
if should_send_default_pii() | |
else SENSITIVE_DATA_SUBSTITUTE | |
), | |
} | |
return event | |
return event_processor | |
def patch_consume(): | |
old_call = HandlerItem.call | |
@ensure_integration_enabled(FastStreamKafkaIntegration, old_call) | |
async def _sentry_handler_item( | |
handler_item: HandlerItem, /, message: KafkaMessage, **kwargs | |
) -> Any: | |
scope = Scope.get_current_scope() | |
scope.add_event_processor(_make_event_processor(message)) | |
with start_span( | |
op=OP.QUEUE_PROCESS, | |
name=handler_item.call_name, | |
origin=FastStreamKafkaIntegration.origin, | |
) as span: | |
span.set_data("messaging.message.id", message.message_id) | |
span.set_data("messaging.message.correlation_id", message.correlation_id) | |
span.set_data("messaging.destination.name", message.raw_message.topic) # type: ignore[union-attr] | |
span.set_data("messaging.message.body.size", len(message.body)) | |
span.set_data( | |
"messaging.topic.partition.number", | |
message.raw_message.partition, # type: ignore[union-attr] | |
) | |
span.set_data( | |
"messaging.topic.partition.offset", message.raw_message.offset # type: ignore[union-attr] | |
) | |
span.set_data( | |
"messaging.message.recieve.latency", | |
datetime.now(tz=UTC) | |
- datetime.fromtimestamp(message.raw_message.timestamp / 1000, tz=UTC), # type: ignore[union-attr] | |
) | |
try: | |
result = ensure_response( | |
await old_call(handler_item, message=message, **kwargs), | |
) | |
result.add_headers(set_trace_headers(message.headers)) | |
span.set_status(SPANSTATUS.OK) | |
return result | |
except Exception: | |
span.set_status(SPANSTATUS.INTERNAL_ERROR) | |
reraise(*sys.exc_info()) | |
HandlerItem.call = _sentry_handler_item | |
old_process_message = SubscriberUsecase.process_message | |
@ensure_integration_enabled(FastStreamKafkaIntegration, old_process_message) | |
async def _sentry_process_message(self: SubscriberUsecase, message: KafkaMessage): | |
with isolation_scope() as scope: | |
with capture_internal_exceptions(): | |
scope._name = FastStreamKafkaIntegration.identifier | |
scope.clear_breadcrumbs() | |
transaction = continue_trace( | |
{k: v.decode() for k, v in message.headers}, # type: ignore | |
name=self.calls[0].call_name, | |
op=OP.QUEUE_PROCESS, | |
source=TRANSACTION_SOURCE_TASK, | |
origin=FastStreamKafkaIntegration.origin, | |
) | |
with start_transaction(transaction): | |
try: | |
result = await old_process_message(self, message) | |
transaction.set_status(SPANSTATUS.OK) | |
return result | |
except Exception: | |
exc_info = sys.exc_info() | |
_capture_faststream_exception(exc_info) | |
reraise(*exc_info) | |
SubscriberUsecase.process_message = _sentry_process_message |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment