Skip to content

Instantly share code, notes, and snippets.

@jedi4ever
Last active February 21, 2025 08:14
Show Gist options
  • Save jedi4ever/ccc7239f8ce3b561b8e59734ab7d4102 to your computer and use it in GitHub Desktop.
Save jedi4ever/ccc7239f8ce3b561b8e59734ab7d4102 to your computer and use it in GitHub Desktop.
openai realtime azure python example
import asyncio
import websockets
import json
import os
from dotenv import load_dotenv
import pyaudio
import numpy as np
import base64
import time
load_dotenv()
# For openai
#key = os.getenv("OPENAI_API_KEY")
#url = "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01"
# For azure openai
endpoint = os.getenv("AZURE_OPENAI_ENDPOINT") # ex. https://my-eastus2-openai-resource.openai.azure.com/
deployment = os.getenv("AZURE_OPENAI_DEPLOYMENT") # gpt-4o-realtime-preview
key = os.getenv("AZURE_OPENAI_API_KEY") # this is the API key for the Azure OpenAI resource
# strip the https:// from the endpoint
endpoint = endpoint.replace("https://", "")
url = f"wss://{endpoint}/openai/realtime?deployment={deployment}&api-version=2024-10-01-preview"
#wss://my-eastus2-openai-resource.openai.azure.com/openai/realtime?api-version=2024-10-01-preview&deployment=gpt-4o-realtime-preview-1001&api_key=....
print(key)
print(url)
async def connect():
async with websockets.connect(url, extra_headers={
"api-key": key , # if we provide a key , we don't need to provide the authorization header
#"Authorization": "Bearer " + credential.key ,
# "OpenAI-Beta": "realtime=v1",
}) as websocket:
print("Connected to server.")
# Create tasks for sending and receiving messages
receive_task = asyncio.create_task(receive_messages(websocket))
send_task = asyncio.create_task(send_messages(websocket))
# Call record_audio in a separate thread
record_task= await asyncio.to_thread(record_audio, websocket)
# Wait for both tasks to complete
await asyncio.gather(receive_task, send_task, record_task)
# New function to handle receiving messages
async def receive_messages(websocket):
# play the delta audio chunk using pyaudio
p = pyaudio.PyAudio()
stream = p.open(format=pyaudio.paInt16, channels=1, rate=24000, output=True)
while True:
message = await websocket.recv()
message_data = json.loads(message)
if message_data.get("type") == "response.done": # Check for response.done type
print(message_data) # Print only if the type is response.done
elif message_data.get("type") == "response.audio.delta": # Check for response.audio.delta type
delta = message_data.get("delta")
# decode delta from base64
delta = base64.b64decode(delta)
stream.write(delta)
else:
print(f"Received message type: {message_data.get('type')}") # Print the type if not response.done
stream.stop_stream()
stream.close()
p.terminate()
# New function to handle audio recording in a separate thread
async def record_audio(websocket):
# Set up PyAudio
p = pyaudio.PyAudio()
sample_rate = 24000
duration_ms = 100
samples_per_chunk = sample_rate * (duration_ms / 1000)
bytes_per_sample = 2
bytes_per_chunk = int(samples_per_chunk * bytes_per_sample)
chunk_size = 2400 # 100ms chunks
format = pyaudio.paInt16
channels = 1 # Mono
record_seconds = 500
# Open the microphone stream
stream = p.open(format=format,
channels=channels,
rate=sample_rate,
input=True,
frames_per_buffer=chunk_size)
await websocket.send(json.dumps({
"type": "session.update",
"session": {
"turn_detection": {
"type": "server_vad",
"threshold": 0.5,
"prefix_padding_ms": 300,
"silence_duration_ms": 200
},
"input_audio_transcription": {
"model": "whisper-1"
}
}
}))
print("Listening to microphone for 5 seconds...")
start_time = time.time()
chunk_counter = 0 # Initialize a counter for audio chunks
while time.time() - start_time < record_seconds:
# Read audio data from the microphone
data = stream.read(chunk_size)
# Convert to numpy array (already mono)
audio_data = np.frombuffer(data, dtype=np.int16)
# Convert to bytes and encode in base64
base64_audio = base64.b64encode(audio_data.tobytes()).decode('utf-8')
chunk_counter += 1 # Increment the counter
print(f"sending audio chunk {chunk_counter}") # Print the counter
# Send the audio chunk
await websocket.send(json.dumps({
"type": "input_audio_buffer.append",
"audio": base64_audio
}))
# Wait for the server to process the audio chunk
# Needed to avoid buffer overflow
await asyncio.sleep(0.1)
# Stop and close the stream
stream.stop_stream()
stream.close()
p.terminate()
print("Finished recording.")
# Not necessary as the server will detect the end of the audio stream
# Send the audio buffer finalize message
#await websocket.send(json.dumps({
# "type": "input_audio_buffer.commit",
#}))
# Update send_messages function to call record_audio in a separate thread
async def send_messages(websocket):
await websocket.send(json.dumps({
"type": "response.create",
"response": {
"modalities": ["text"],
"instructions": "Please assist the user.",
}
}))
if __name__ == "__main__":
asyncio.run(connect())
pyaudio
websockets
python-dotenv
numpy
@VarunChopra11
Copy link

Hi Patrick,

I really like this code of yours implementing the GPT-4 real-time API.

I just wanted to suggest adding one more functionality: whenever an ongoing response is being generated/spoke by the model, and the user starts speaking a new command, the model should stop responding to the previous command and immediately start processing the new one—similar to how ChatGPT's voice mode works, alexa and more voice assistants.

This is the only feature I felt is missing from the implementation. Can you help me in making these changes?

It will be very nice of you if you help me in this.

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment