Skip to content

Instantly share code, notes, and snippets.

@TolstochenkoDaniil
Created January 22, 2025 10:23
Show Gist options
  • Save TolstochenkoDaniil/373aa8e456a2223745e7bb98196c41a5 to your computer and use it in GitHub Desktop.
Save TolstochenkoDaniil/373aa8e456a2223745e7bb98196c41a5 to your computer and use it in GitHub Desktop.
# Following gist is the example of sentry integration for
# faststream app (https://github.com/airtai/faststream) and only kafka broker as the use case
# Feel free to contact if you have any suggestions or help for refactoring and/or scale the integration
from typing import Any, Optional
from datetime import UTC, datetime
import sys
from faststream.broker.core.usecase import BrokerUsecase
from faststream.broker.publisher.proto import ProducerProto
from faststream.broker.subscriber.call_item import HandlerItem
from faststream.broker.subscriber.usecase import SubscriberUsecase
from faststream.kafka.publisher.usecase import DefaultPublisher
from faststream.broker.response import ensure_response
from faststream.exceptions import IgnoredException, HandlerException
from sentry_sdk import (
Scope,
continue_trace,
get_current_scope,
isolation_scope,
start_span,
start_transaction,
)
from sentry_sdk.consts import OP, SPANSTATUS
from sentry_sdk.integrations import Integration
from sentry_sdk.scope import should_send_default_pii
from sentry_sdk.tracing import (
TRANSACTION_SOURCE_TASK,
BAGGAGE_HEADER_NAME,
)
from faststream.kafka.message import KafkaMessage
from sentry_sdk.utils import (
SENSITIVE_DATA_SUBSTITUTE,
capture_internal_exceptions,
ensure_integration_enabled,
event_from_exception,
reraise,
)
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from sentry_sdk._types import (
Event,
Hint,
ExcInfo,
)
IGNORED_EXCEPTIONS = {}
FASTSTREAM_FLOW_EXCEPTIONS = (IgnoredException, HandlerException)
class FastStreamKafkaIntegration(Integration):
identifier = "faststream.kafka"
origin = f"auto.queue.{identifier}"
@staticmethod
def setup_once():
# type: () -> None
patch_broker_publish()
patch_publisher()
patch_consume()
def set_trace_headers(headers: dict) -> dict:
for (
key,
value,
) in get_current_scope().iter_trace_propagation_headers():
if key == BAGGAGE_HEADER_NAME and headers.get(BAGGAGE_HEADER_NAME):
headers[key] += "," + value
else:
headers[key] = value
return headers
def _capture_faststream_exception(exc_info):
# type: (ExcInfo) -> None
scope = Scope.get_current_scope()
if exc_info[0] in FASTSTREAM_FLOW_EXCEPTIONS:
scope.transaction.set_status(SPANSTATUS.ABORTED)
return
scope.transaction.set_status(SPANSTATUS.INTERNAL_ERROR)
event, hint = event_from_exception(
exc_info,
client_options=Scope.get_client().options,
mechanism={"type": FastStreamKafkaIntegration.identifier, "handled": False},
)
scope.capture_event(event, hint=hint)
def patch_broker_publish():
# type: () -> None
old_publish = BrokerUsecase.publish
@ensure_integration_enabled(FastStreamKafkaIntegration, old_publish)
async def _sentry_publish(
self: BrokerUsecase,
msg: Any,
*,
producer: Optional[ProducerProto],
**kwargs,
) -> Optional[Any]:
with start_span(
op=OP.QUEUE_PUBLISH,
name=f"{producer.__class__.__name__}.{kwargs['topic']}",
origin=FastStreamKafkaIntegration.origin,
) as span:
span.set_data("messaging.message.correlation_id", kwargs["correlation_id"])
span.set_data("messaging.destination.name", kwargs["topic"])
kwargs["headers"] = kwargs.get("headers", {}) or {}
set_trace_headers(kwargs["headers"])
try:
result = await old_publish(self, msg, producer=producer, **kwargs)
span.set_status(SPANSTATUS.OK)
return result
except Exception:
span.set_status(SPANSTATUS.INTERNAL_ERROR)
reraise(*sys.exc_info())
BrokerUsecase.publish = _sentry_publish # type: ignore[method-assign]
def patch_publisher():
# type: () -> None
old_publish = DefaultPublisher.publish
@ensure_integration_enabled(FastStreamKafkaIntegration, old_publish) # type: ignore[arg-type]
async def _sentry_publish(
self: DefaultPublisher,
msg: Any,
topic: str = "",
**kwargs,
) -> Optional[Any]:
with start_span(
op=OP.QUEUE_PUBLISH,
name=f"{self._producer.__class__.__name__}.{self.topic}",
origin=FastStreamKafkaIntegration.origin,
) as span:
span.set_data("messaging.message.correlation_id", kwargs["correlation_id"])
span.set_data("messaging.destination.name", self.topic)
kwargs["headers"] = kwargs.get("headers", {}) or {}
set_trace_headers(kwargs["headers"])
try:
result = await old_publish(self, msg, topic=topic, **kwargs)
span.set_status(SPANSTATUS.OK)
return result
except Exception:
span.set_status(SPANSTATUS.INTERNAL_ERROR)
reraise(*sys.exc_info())
DefaultPublisher.publish = _sentry_publish # type: ignore[method-assign]
def _make_event_processor(msg: KafkaMessage):
def event_processor(event, hint):
# type: (Event, Hint) -> Optional[Event]
with capture_internal_exceptions():
tags = event.setdefault("tags", {})
tags["topic"] = msg.raw_message.topic # type: ignore[union-attr]
tags["event_message_id"] = msg.message_id
tags["correlation_id"] = msg.correlation_id
extra = event.setdefault("extra", {})
extra["event-args"] = {
"message_id": msg.message_id,
"correlation_id": msg.correlation_id,
"headers": msg.headers,
"topic": msg.raw_message.topic, # type: ignore[union-attr]
"content_type": msg.content_type,
"partition": msg.raw_message.partition, # type: ignore[union-attr]
"offset": msg.raw_message.offset, # type: ignore[union-attr]
"value": (
msg.decoded_body
if should_send_default_pii()
else SENSITIVE_DATA_SUBSTITUTE
),
}
return event
return event_processor
def patch_consume():
old_call = HandlerItem.call
@ensure_integration_enabled(FastStreamKafkaIntegration, old_call)
async def _sentry_handler_item(
handler_item: HandlerItem, /, message: KafkaMessage, **kwargs
) -> Any:
scope = Scope.get_current_scope()
scope.add_event_processor(_make_event_processor(message))
with start_span(
op=OP.QUEUE_PROCESS,
name=handler_item.call_name,
origin=FastStreamKafkaIntegration.origin,
) as span:
span.set_data("messaging.message.id", message.message_id)
span.set_data("messaging.message.correlation_id", message.correlation_id)
span.set_data("messaging.destination.name", message.raw_message.topic) # type: ignore[union-attr]
span.set_data("messaging.message.body.size", len(message.body))
span.set_data(
"messaging.topic.partition.number",
message.raw_message.partition, # type: ignore[union-attr]
)
span.set_data(
"messaging.topic.partition.offset", message.raw_message.offset # type: ignore[union-attr]
)
span.set_data(
"messaging.message.recieve.latency",
datetime.now(tz=UTC)
- datetime.fromtimestamp(message.raw_message.timestamp / 1000, tz=UTC), # type: ignore[union-attr]
)
try:
result = ensure_response(
await old_call(handler_item, message=message, **kwargs),
)
result.add_headers(set_trace_headers(message.headers))
span.set_status(SPANSTATUS.OK)
return result
except Exception:
span.set_status(SPANSTATUS.INTERNAL_ERROR)
reraise(*sys.exc_info())
HandlerItem.call = _sentry_handler_item
old_process_message = SubscriberUsecase.process_message
@ensure_integration_enabled(FastStreamKafkaIntegration, old_process_message)
async def _sentry_process_message(self: SubscriberUsecase, message: KafkaMessage):
with isolation_scope() as scope:
with capture_internal_exceptions():
scope._name = FastStreamKafkaIntegration.identifier
scope.clear_breadcrumbs()
transaction = continue_trace(
{k: v.decode() for k, v in message.headers}, # type: ignore
name=self.calls[0].call_name,
op=OP.QUEUE_PROCESS,
source=TRANSACTION_SOURCE_TASK,
origin=FastStreamKafkaIntegration.origin,
)
with start_transaction(transaction):
try:
result = await old_process_message(self, message)
transaction.set_status(SPANSTATUS.OK)
return result
except Exception:
exc_info = sys.exc_info()
_capture_faststream_exception(exc_info)
reraise(*exc_info)
SubscriberUsecase.process_message = _sentry_process_message
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment