Skip to content

Instantly share code, notes, and snippets.

@sirk390
Last active November 6, 2024 14:06
Show Gist options
  • Save sirk390/3c8791b64372e94415ebdb7330dc0ba5 to your computer and use it in GitHub Desktop.
Save sirk390/3c8791b64372e94415ebdb7330dc0ba5 to your computer and use it in GitHub Desktop.
import pprint
import json
import base64
import asyncio
import websockets
import numpy as np
import logging
import sounddevice as sd
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
API_KEY = "..."
WEBSOCKET_URL = "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01"
class AsyncAudio:
def __init__(self, chunk=1024, channels=1, rate=24000, format='int16'):
self.chunk = chunk
self.channels = channels
self.rate = rate
self.format = format
self.stream_play = None
self.stream_record = None
async def record(self):
if not self.stream_record:
self.stream_record = sd.InputStream(
channels=self.channels,
samplerate=self.rate,
dtype=self.format,
blocksize=self.chunk
)
self.stream_record.start()
loop = asyncio.get_running_loop()
try:
data = await loop.run_in_executor(None, lambda: self.stream_record.read(self.chunk)[0]
)
return data.tobytes()
except asyncio.CancelledError:
self.stream_record.stop()
self.stream_record.close()
self.stream_record = None
async def play(self, audio_data):
if not self.stream_play:
self.stream_play = sd.OutputStream(
channels=self.channels,
samplerate=self.rate,
dtype=self.format,
blocksize=self.chunk
)
self.stream_play.start()
loop = asyncio.get_running_loop()
try:
data = np.frombuffer(audio_data, dtype=np.int16)
await loop.run_in_executor(None, self.stream_play.write, data)
except asyncio.CancelledError:
self.stream_play.stop()
self.stream_play.close()
self.stream_play = None
class RealtimeChat:
def __init__(self):
self.ws = None
self.audio = AsyncAudio()
self.outqueue = asyncio.Queue()
self.playqueue = asyncio.Queue()
self.cancel_play = asyncio.Event()
async def main(self):
async with websockets.connect(WEBSOCKET_URL, extra_headers={ "Authorization": f"Bearer {API_KEY}", "OpenAI-Beta": "realtime=v1" }) as websocket:
self.ws = websocket
logger.info("WebSocket connection opened")
record_task = asyncio.create_task(self.record_audio())
send_task = asyncio.create_task(self.send_audio())
receive_task = asyncio.create_task(self.receive_messages())
play_audio_task = asyncio.create_task(self.play_audio())
await asyncio.gather(receive_task, record_task, send_task, play_audio_task)
async def play_audio(self):
while self.is_running:
audio_data = await self.playqueue.get()
self.cancel_play.clear()
play_task = asyncio.create_task(self.audio.play(audio_data))
wait_cancel = asyncio.create_task(self.cancel_play.wait())
done, pending = await asyncio.wait([wait_cancel, play_task], return_when=asyncio.FIRST_COMPLETED)
if wait_cancel in done:
play_task.cancel()
while not self.playqueue.empty():
await self.playqueue.get()
else:
wait_cancel.cancel()
async def receive_messages(self):
while self.is_running:
message = await self.ws.recv()
event = json.loads(message)
event_type = event.get('type')
if event_type == 'response.audio.delta':
audio_data = base64.b64decode(event.get('delta', ''))
self.playqueue.put_nowait(audio_data)
elif event_type == 'response.audio_transcript.delta':
transcript = event.get('delta', '')
print(transcript, end="", flush=True)
elif event_type == 'input_audio_buffer.speech_stopped':
self.cancel_play.set()
else:
pass
async def send_audio(self):
while True:
audio_data = await self.outqueue.get()
base64_audio = base64.b64encode(audio_data).decode('utf-8')
await self.ws.send(json.dumps({ "type": "input_audio_buffer.append", "audio": base64_audio }))
async def record_audio(self):
while True:
audio_data = await self.audio.record()
self.outqueue.put_nowait(audio_data)
async def run(self):
self.is_running = True
try:
await self.main()
except asyncio.CancelledError:
pass
finally:
self.is_running = False
if self.ws and self.ws.open:
await self.ws.close()
if __name__ == "__main__":
chat = RealtimeChat()
asyncio.run(chat.run())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment