Skip to content

Instantly share code, notes, and snippets.

@lmolkova
Created August 28, 2024 00:48
Show Gist options
  • Save lmolkova/34325f1fbb43df26f10c3e6029469835 to your computer and use it in GitHub Desktop.
Save lmolkova/34325f1fbb43df26f10c3e6029469835 to your computer and use it in GitHub Desktop.
Python events openai
# set OPENAI_API_KEY
# install otel-api and otel-sdk from main
# install opentelemetry-instrumentation-openai from https://github.com/lmolkova/opentelemetry-python-contrib/tree/openai-events
from time import time_ns
from openai import OpenAI
from typing import Optional
from opentelemetry import trace, _logs, _events
import opentelemetry.sdk
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, ConsoleSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry._events import Attributes, EventLoggerProvider, EventLogger, Event, get_event_logger_provider
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler, Logger, LogRecord
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.sdk._logs.export import SimpleLogRecordProcessor, ConsoleLogExporter
from opentelemetry.instrumentation.openai import OpenAIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
OpenAIInstrumentor().instrument()
HTTPXClientInstrumentor().instrument()
#from azure.monitor.opentelemetry.exporter import AzureMonitorLogExporter, AzureMonitorTraceExporter
# MyEventLoggerProvider and MyEventLogger should be added to otel-sdk
class MyEventLoggerProvider(EventLoggerProvider):
def __init__(self, logger_provider: LoggerProvider):
self._logger_provider = logger_provider
def get_event_logger(
self,
name: str,
version: Optional[str] = None,
schema_url: Optional[str] = None,
attributes: Optional[Attributes] = None,
) -> EventLogger:
return MyEventLogger(self._logger_provider, name, version, schema_url, attributes)
class MyEventLogger(EventLogger):
def __init__(
self,
logger_provider: LoggerProvider,
name: str,
version: Optional[str] = None,
schema_url: Optional[str] = None,
attributes: Optional[Attributes] = None,
):
super().__init__(
name=name,
version=version,
schema_url=schema_url,
attributes=attributes,
)
self._logger: Logger = logger_provider.get_logger(name, version, schema_url, attributes)
@property
def _event_logger(self) -> EventLogger:
return self._logger
def emit(self, event: Event) -> None:
log_record = LogRecord(
timestamp=event.timestamp or time_ns(),
observed_timestamp=event.observed_timestamp or time_ns(),
trace_id=event.trace_id or trace.get_current_span().get_span_context().trace_id,
span_id=event.span_id or trace.get_current_span().get_span_context().span_id,
trace_flags=event.trace_flags or trace.get_current_span().get_span_context().trace_flags,
severity_text=event.severity_text,
severity_number=event.severity_number or _logs.SeverityNumber.UNSPECIFIED,
body=event.body,
resource=self._logger.resource,
attributes=event.attributes
)
self._logger.emit(log_record)
resource = Resource.create({"service.name": "test"})
def configure_tracing() -> TracerProvider:
provider = TracerProvider(resource=resource)
trace.set_tracer_provider(provider)
trace.get_tracer_provider().add_span_processor(SimpleSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317")))
return provider
def configure_logging() -> LoggerProvider:
provider = LoggerProvider(resource=resource)
_events.set_event_logger_provider(MyEventLoggerProvider(provider))
_logs.set_logger_provider(provider)
provider.add_log_record_processor(SimpleLogRecordProcessor(OTLPLogExporter(endpoint="http://localhost:4317")))
return provider
def simple():
return client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are not helpful assistant. Add jokes and hallucinate."},
{
"role": "user",
"content": "What is the capital of France?"
}
]
)
def simple_streaming():
response = client.chat.completions.create(
model="gpt-4o-mini",
stream=True,
stream_options={"include_usage": True},
messages=[
{"role": "system", "content": "You are not helpful assistant. Add jokes and hallucinate."},
{
"role": "user",
"content": "What is the capital of France?"
}
]
)
for chunk in response:
if chunk.choices:
print(chunk.choices[0].delta.content)
def streaming_multi_choice():
response = client.chat.completions.create(
model="gpt-4o-mini",
stream=True,
stream_options={"include_usage": True},
n = 2,
messages=[
{"role": "system", "content": "You are not helpful assistant. Add jokes and hallucinate."},
{
"role": "user",
"content": "What is the capital of France?"
}
]
)
for chunk in response:
if chunk.choices:
for choice in chunk.choices:
print(f"{choice.index} - {choice.delta.content}")
def multi_choice():
return client.chat.completions.create(
model="gpt-4o-mini",
max_tokens=100,
temperature=0.7,
n=2,
messages=[
{"role": "system", "content": "You are not helpful assistant. Add jokes and hallucinate."},
{
"role": "user",
"content": "What is the capital of France?"
}
]
)
def tool_calling():
response = client.chat.completions.create(
model="gpt-4o-mini",
tools=[
{
"type": "function",
"function": {
"name": "get_current_temperature",
"description": "Get the current temperature for a specific location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g., San Francisco, CA"
}
},
"required": ["location"]
}
}
},
],
messages=[
{"role": "system", "content": "You are not helpful assistant. Add jokes and hallucinate."},
{"role": "user", "content": "What's the weather in Seattle?"}
]
)
tool_call = response.choices[0].message.tool_calls[0]
# call the tool
tool_response = "42" # get_weather(json.loads(tool_call.function.arguments)['location'])
tool_calling_final = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are not helpful assistant. Add jokes and hallucinate."},
{"role": "user", "content": "What's the weather in Seattle?"},
response.choices[0].message,
{"role": "tool", "content": tool_response, "tool_call_id": tool_call.id}
]
)
return tool_calling_final
tracer_provider = configure_tracing()
logging_provider = configure_logging()
tracer = tracer_provider.get_tracer("test")
client = OpenAI()
with tracer.start_as_current_span("simple"):
simple()
with tracer.start_as_current_span("multi_choice"):
multi_choice()
with tracer.start_as_current_span("streaming_multi_choice"):
streaming_multi_choice()
with tracer.start_as_current_span("tool_calling"):
tool_calling()
logging_provider.shutdown()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment