Created
August 28, 2024 00:48
-
-
Save lmolkova/34325f1fbb43df26f10c3e6029469835 to your computer and use it in GitHub Desktop.
Python events openai
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# set OPENAI_API_KEY | |
# install otel-api and otel-sdk from main | |
# install opentelemetry-instrumentation-openai from https://github.com/lmolkova/opentelemetry-python-contrib/tree/openai-events | |
from time import time_ns | |
from openai import OpenAI | |
from typing import Optional | |
from opentelemetry import trace, _logs, _events | |
import opentelemetry.sdk | |
from opentelemetry.sdk.trace import TracerProvider | |
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter | |
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, ConsoleSpanExporter | |
from opentelemetry.sdk.resources import SERVICE_NAME, Resource | |
from opentelemetry._events import Attributes, EventLoggerProvider, EventLogger, Event, get_event_logger_provider | |
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler, Logger, LogRecord | |
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter | |
from opentelemetry.sdk._logs.export import SimpleLogRecordProcessor, ConsoleLogExporter | |
from opentelemetry.instrumentation.openai import OpenAIInstrumentor | |
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor | |
from opentelemetry.sdk.trace.export import SimpleSpanProcessor | |
OpenAIInstrumentor().instrument() | |
HTTPXClientInstrumentor().instrument() | |
#from azure.monitor.opentelemetry.exporter import AzureMonitorLogExporter, AzureMonitorTraceExporter | |
# MyEventLoggerProvider and MyEventLogger should be added to otel-sdk | |
class MyEventLoggerProvider(EventLoggerProvider): | |
def __init__(self, logger_provider: LoggerProvider): | |
self._logger_provider = logger_provider | |
def get_event_logger( | |
self, | |
name: str, | |
version: Optional[str] = None, | |
schema_url: Optional[str] = None, | |
attributes: Optional[Attributes] = None, | |
) -> EventLogger: | |
return MyEventLogger(self._logger_provider, name, version, schema_url, attributes) | |
class MyEventLogger(EventLogger): | |
def __init__( | |
self, | |
logger_provider: LoggerProvider, | |
name: str, | |
version: Optional[str] = None, | |
schema_url: Optional[str] = None, | |
attributes: Optional[Attributes] = None, | |
): | |
super().__init__( | |
name=name, | |
version=version, | |
schema_url=schema_url, | |
attributes=attributes, | |
) | |
self._logger: Logger = logger_provider.get_logger(name, version, schema_url, attributes) | |
@property | |
def _event_logger(self) -> EventLogger: | |
return self._logger | |
def emit(self, event: Event) -> None: | |
log_record = LogRecord( | |
timestamp=event.timestamp or time_ns(), | |
observed_timestamp=event.observed_timestamp or time_ns(), | |
trace_id=event.trace_id or trace.get_current_span().get_span_context().trace_id, | |
span_id=event.span_id or trace.get_current_span().get_span_context().span_id, | |
trace_flags=event.trace_flags or trace.get_current_span().get_span_context().trace_flags, | |
severity_text=event.severity_text, | |
severity_number=event.severity_number or _logs.SeverityNumber.UNSPECIFIED, | |
body=event.body, | |
resource=self._logger.resource, | |
attributes=event.attributes | |
) | |
self._logger.emit(log_record) | |
resource = Resource.create({"service.name": "test"}) | |
def configure_tracing() -> TracerProvider: | |
provider = TracerProvider(resource=resource) | |
trace.set_tracer_provider(provider) | |
trace.get_tracer_provider().add_span_processor(SimpleSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))) | |
return provider | |
def configure_logging() -> LoggerProvider: | |
provider = LoggerProvider(resource=resource) | |
_events.set_event_logger_provider(MyEventLoggerProvider(provider)) | |
_logs.set_logger_provider(provider) | |
provider.add_log_record_processor(SimpleLogRecordProcessor(OTLPLogExporter(endpoint="http://localhost:4317"))) | |
return provider | |
def simple(): | |
return client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=[ | |
{"role": "system", "content": "You are not helpful assistant. Add jokes and hallucinate."}, | |
{ | |
"role": "user", | |
"content": "What is the capital of France?" | |
} | |
] | |
) | |
def simple_streaming(): | |
response = client.chat.completions.create( | |
model="gpt-4o-mini", | |
stream=True, | |
stream_options={"include_usage": True}, | |
messages=[ | |
{"role": "system", "content": "You are not helpful assistant. Add jokes and hallucinate."}, | |
{ | |
"role": "user", | |
"content": "What is the capital of France?" | |
} | |
] | |
) | |
for chunk in response: | |
if chunk.choices: | |
print(chunk.choices[0].delta.content) | |
def streaming_multi_choice(): | |
response = client.chat.completions.create( | |
model="gpt-4o-mini", | |
stream=True, | |
stream_options={"include_usage": True}, | |
n = 2, | |
messages=[ | |
{"role": "system", "content": "You are not helpful assistant. Add jokes and hallucinate."}, | |
{ | |
"role": "user", | |
"content": "What is the capital of France?" | |
} | |
] | |
) | |
for chunk in response: | |
if chunk.choices: | |
for choice in chunk.choices: | |
print(f"{choice.index} - {choice.delta.content}") | |
def multi_choice(): | |
return client.chat.completions.create( | |
model="gpt-4o-mini", | |
max_tokens=100, | |
temperature=0.7, | |
n=2, | |
messages=[ | |
{"role": "system", "content": "You are not helpful assistant. Add jokes and hallucinate."}, | |
{ | |
"role": "user", | |
"content": "What is the capital of France?" | |
} | |
] | |
) | |
def tool_calling(): | |
response = client.chat.completions.create( | |
model="gpt-4o-mini", | |
tools=[ | |
{ | |
"type": "function", | |
"function": { | |
"name": "get_current_temperature", | |
"description": "Get the current temperature for a specific location", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"location": { | |
"type": "string", | |
"description": "The city and state, e.g., San Francisco, CA" | |
} | |
}, | |
"required": ["location"] | |
} | |
} | |
}, | |
], | |
messages=[ | |
{"role": "system", "content": "You are not helpful assistant. Add jokes and hallucinate."}, | |
{"role": "user", "content": "What's the weather in Seattle?"} | |
] | |
) | |
tool_call = response.choices[0].message.tool_calls[0] | |
# call the tool | |
tool_response = "42" # get_weather(json.loads(tool_call.function.arguments)['location']) | |
tool_calling_final = client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=[ | |
{"role": "system", "content": "You are not helpful assistant. Add jokes and hallucinate."}, | |
{"role": "user", "content": "What's the weather in Seattle?"}, | |
response.choices[0].message, | |
{"role": "tool", "content": tool_response, "tool_call_id": tool_call.id} | |
] | |
) | |
return tool_calling_final | |
tracer_provider = configure_tracing() | |
logging_provider = configure_logging() | |
tracer = tracer_provider.get_tracer("test") | |
client = OpenAI() | |
with tracer.start_as_current_span("simple"): | |
simple() | |
with tracer.start_as_current_span("multi_choice"): | |
multi_choice() | |
with tracer.start_as_current_span("streaming_multi_choice"): | |
streaming_multi_choice() | |
with tracer.start_as_current_span("tool_calling"): | |
tool_calling() | |
logging_provider.shutdown() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment