Skip to content

Instantly share code, notes, and snippets.

@vmagamedov
Created August 24, 2021 15:25
Show Gist options
  • Save vmagamedov/19a29f7a4f8f70d76bbc797a0e994112 to your computer and use it in GitHub Desktop.
Save vmagamedov/19a29f7a4f8f70d76bbc797a0e994112 to your computer and use it in GitHub Desktop.
import asyncio
from contextvars import ContextVar
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import get_tracer, SpanKind
from opentelemetry.context import attach, detach
from opentelemetry.propagate import extract
import grpclib
import opentelemetry.trace.status
from grpclib.events import RecvRequest, SendTrailingMetadata, listen
from grpclib.utils import graceful_exit
from grpclib.server import Server, Stream
# generated by protoc
from .helloworld_pb2 import HelloRequest, HelloReply
from .helloworld_grpc import GreeterBase
_server_span = ContextVar("server_span")
def _wrap(func):
async def _wrapper(stream):
try:
await func(stream)
except Exception as exc:
_, span, _ = _server_span.get()
span.record_exception(exc)
raise
return _wrapper
async def _recv_request(event: RecvRequest) -> None:
tracer = get_tracer(__name__)
token = attach(extract(event.metadata))
service, _, method = event.method_name.lstrip("/").partition("/")
span_ctx = tracer.start_as_current_span(
event.method_name,
kind=SpanKind.SERVER,
attributes={
SpanAttributes.RPC_SYSTEM: "grpc",
SpanAttributes.RPC_METHOD: method,
SpanAttributes.RPC_SERVICE: service,
},
)
span = span_ctx.__enter__()
event.method_func = _wrap(event.method_func)
_server_span.set((span_ctx, span, token))
async def _send_trailing_metadata(event: SendTrailingMetadata) -> None:
span_ctx, span, token = _server_span.get()
if event.status is not grpclib.Status.OK:
span.set_status(
opentelemetry.trace.status.Status(
status_code=opentelemetry.trace.status.StatusCode.ERROR,
description="{}:{}".format(event.status.value,
event.status_message),
)
)
span_ctx.__exit__(None, None, None)
detach(token)
class Greeter(GreeterBase):
async def SayHello(self, stream: Stream[HelloRequest, HelloReply]) -> None:
request = await stream.recv_message()
assert request is not None
1/0
message = f'Hello, {request.name}!'
await stream.send_message(HelloReply(message=message))
async def main(*, host: str = '127.0.0.1', port: int = 50051) -> None:
provider = TracerProvider()
processor = BatchSpanProcessor(ConsoleSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
server = Server([Greeter()])
listen(server, RecvRequest, _recv_request)
listen(server, SendTrailingMetadata, _send_trailing_metadata)
# Note: graceful_exit isn't supported in Windows
with graceful_exit([server]):
await server.start(host, port)
print(f'Serving on {host}:{port}')
await server.wait_closed()
if __name__ == '__main__':
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment