Created
September 10, 2024 21:14
-
-
Save draincoder/5d037abf57e07d04cd1d0802549b8cfb to your computer and use it in GitHub Desktop.
FastStream span linking example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from opentelemetry import trace | |
from opentelemetry.context import Context | |
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter | |
from opentelemetry.sdk.resources import Resource | |
from opentelemetry.sdk.trace import TracerProvider | |
from opentelemetry.sdk.trace.export import BatchSpanProcessor | |
from opentelemetry.trace import Link | |
from faststream import FastStream | |
from faststream.nats import NatsBroker, NatsMessage | |
from faststream.nats.opentelemetry import NatsTelemetryMiddleware | |
from faststream.opentelemetry import CurrentSpan | |
resource = Resource.create(attributes={"service.name": "faststream"}) | |
tracer_provider = TracerProvider(resource=resource) | |
trace.set_tracer_provider(tracer_provider) | |
tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(endpoint="http://127.0.0.1:4317"))) | |
tracer = trace.get_tracer(__name__) | |
broker = NatsBroker(middlewares=(NatsTelemetryMiddleware(tracer_provider=tracer_provider),)) | |
app = FastStream(broker) | |
@broker.subscriber("first") | |
async def first_handler(msg: str, span: CurrentSpan) -> None: | |
# Create a link to the origin span | |
links = [Link(span.get_span_context())] | |
for i in range(10): | |
span_name = f"start second {i}" | |
with tracer.start_as_current_span(span_name, Context(), links=links) as s: | |
# Add a link to child span to the origin span | |
span.add_link(s.get_span_context()) | |
await broker.publish(msg, "second") | |
@broker.subscriber("second") | |
async def second_handler(msg: NatsMessage) -> None: | |
print(msg.body) | |
@app.after_startup | |
async def send(): | |
await broker.publish("Hello!", "first") | |
if __name__ == "__main__": | |
asyncio.run(app.run()) |
Author
draincoder
commented
Sep 10, 2024



Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment