Skip to content

Instantly share code, notes, and snippets.

@tonybaloney
Last active September 19, 2021 01:41
Show Gist options
  • Save tonybaloney/f8b8bd86bf669aff1b9b882c06219392 to your computer and use it in GitHub Desktop.
Save tonybaloney/f8b8bd86bf669aff1b9b882c06219392 to your computer and use it in GitHub Desktop.
"""
Usage:
>>> tracer.add_span_processor(BatchSpanProcessor(RichSpanExporter()))
"""
import datetime
import typing
from typing import Optional
import opentelemetry.trace
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from opentelemetry.semconv.trace import SpanAttributes
from rich.console import Console
from rich.syntax import Syntax
from rich.text import Text
from rich.tree import Tree
def ns_to_time(nanoseconds):
ts = datetime.datetime.utcfromtimestamp(nanoseconds / 1e9)
return ts.strftime("%H:%M:%S.%f")
class RichSpanExporter(SpanExporter):
"""Implementation of :class:`SpanExporter` that prints spans to the
console.
Must be used within a BatchSpanProcessor
"""
def __init__(
self,
service_name: Optional[str] = None,
):
self.service_name = service_name
self.console = Console()
def _child_to_tree(self, child: Tree, span: ReadableSpan):
child.add(Text.from_markup(f"[bold cyan]Kind :[/bold cyan] {span.kind.name}"))
if not span.status.is_unset:
if not span.status.is_ok:
child.add(
Text.from_markup(
f"[bold cyan]Status :[/bold cyan] [red]{span.status.status_code}[/red]"
)
)
else:
child.add(
Text.from_markup(
f"[bold cyan]Status :[/bold cyan] {span.status.status_code}"
)
)
if span.status.description:
child.add(
Text.from_markup(
f"[bold cyan]Description :[/bold cyan] {span.status.description}"
)
)
if span.events:
events = child.add(
label=Text.from_markup(f"[bold cyan]Events :[/bold cyan] ")
)
for event in span.events:
event_node = events.add(Text(event.name))
for k, v in event.attributes.items():
event_node.add(
Text.from_markup(f"[bold cyan]{k} :[/bold cyan] {v}")
)
if span.attributes:
attributes = child.add(
label=Text.from_markup(f"[bold cyan]Attributes :[/bold cyan] ")
)
for attribute in span.attributes:
if attribute == SpanAttributes.DB_STATEMENT:
attributes.add(
Text.from_markup(f"[bold cyan]{attribute} :[/bold cyan] ")
)
attributes.add(Syntax(span.attributes[attribute], "sql"))
else:
attributes.add(
Text.from_markup(
f"[bold cyan]{attribute} :[/bold cyan] {span.attributes[attribute]}"
)
)
def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult:
if not spans:
return SpanExportResult.SUCCESS
tree = Tree(
label=f"Trace {opentelemetry.trace.format_trace_id(spans[0].context.trace_id)}"
)
parents = {}
root_spans = [span for span in spans if not span.parent]
for span in root_spans:
child = tree.add(
label=Text.from_markup(
f"[blue][{ns_to_time(span.start_time)}][/blue] [bold]{span.name}[/bold], span {opentelemetry.trace.format_span_id(span.context.span_id)}"
)
)
parents[span.context.span_id] = child
self._child_to_tree(child, span)
child_spans = [span for span in spans if span.parent]
for span in child_spans:
if span.parent.span_id not in parents:
raise ValueError(
"This exporter needs to be used from a BatchSpanProcessor"
)
child = parents[span.parent.span_id].add(
label=Text.from_markup(
f"[blue][{ns_to_time(span.start_time)}][/blue] [bold]{span.name}[/bold], span {opentelemetry.trace.format_span_id(span.context.span_id)}"
)
)
parents[span.context.span_id] = child
self._child_to_tree(child, span)
self.console.print(tree)
return SpanExportResult.SUCCESS
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment