Skip to content

Instantly share code, notes, and snippets.

@DannyAziz
Created August 21, 2024 22:47
Show Gist options
  • Save DannyAziz/717d6d04b40647365cfcc7afcbc28c67 to your computer and use it in GitHub Desktop.
Save DannyAziz/717d6d04b40647365cfcc7afcbc28c67 to your computer and use it in GitHub Desktop.
Perplexity Voice Search in Python
import asyncio
import websockets
import ssl
import json
import uuid as uuid_module
import wave
import struct
import re
import pyaudio
import io
import threading
"""
In general the order of operations is the following:
1. Set a "JWT" payload to auth the session
2. When we get a SID payload (that doesn't contain anything else) then we can fire off requests
3. Record the audio, send the output of create_transcription_payload and the raw bytes of the audio
4. We will receive back a payload that looks something like: [{"status":"completed","listening":true,"text":"YOUR TEXT HERE","success":true}]
5. Take that text and fire off "perplexity_ask" and "voice_over" payloads with the same UUID
6. The server will send back the stream for the text response (Which we currently ignore)
7. The server will send "audio" payloads which contains timing information and then right after that the respective audio data in bytes
8. The final audio payload will have "final: true", we can then play the audio!
"""
def record_audio():
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 44100
p = pyaudio.PyAudio()
# List available input devices
print("Available input devices:")
for i in range(p.get_device_count()):
dev_info = p.get_device_info_by_index(i)
if dev_info['maxInputChannels'] > 0:
print(f"Device {i}: {dev_info['name']}")
# Try to use the default input device
default_input = p.get_default_input_device_info()
print(f"Using default input device: {default_input['name']}")
stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
input_device_index=default_input['index'],
frames_per_buffer=CHUNK)
print("Recording... Press Enter to stop.")
frames = []
recording = True
def input_thread():
nonlocal recording
input()
recording = False
thread = threading.Thread(target=input_thread)
thread.start()
while recording:
data = stream.read(CHUNK, exception_on_overflow=False)
frames.append(data)
# Print max amplitude to check if audio is being recorded
amplitude = max(struct.unpack(f"{len(data)//2}h", data))
print(f"Max amplitude: {amplitude}", end="\r")
print("\nRecording finished.")
stream.stop_stream()
stream.close()
p.terminate()
# Convert frames to bytes
audio_data = b''.join(frames)
# Create in-memory WAV file
wav_buffer = io.BytesIO()
with wave.open(wav_buffer, 'wb') as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(p.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(audio_data)
return wav_buffer.getvalue()
def create_ask_payload(query, uuid=str(uuid_module.uuid4())):
# Default parameters
defaults = {
"last_backend_uuid": None,
"user_nextauth_id": "GET_THIS_FROM_YOUR_OWN_NETWORK_REQUESTS",
"mode": "concise",
"source": "ios",
"version": "2.9",
"is_voice_to_voice": True,
"timezone": "America/New_York",
"read_write_token": None,
"use_inhouse_model": False,
"is_related_query": False,
"language": "en-GB",
"frontend_uuid": uuid,
"conversational_enabled": True,
"search_focus": "internet",
"ios_device_id": "GET_THIS_FROM_YOUR_OWN_NETWORK_REQUESTS"
}
payload = [
"perplexity_ask",
query,
defaults
]
return json.dumps(payload)
def create_voice_over_payload(uuid):
payload = [
"voice_over",
{
"completed": False,
"preset": "Alex",
"is_page": False,
"version": "2.9",
"ios_device_id": "GET_THIS_FROM_YOUR_OWN_NETWORK_REQUESTS",
"source": "ios",
"uuid": uuid
}
]
return json.dumps(payload)
def create_transcription_payload(uuid=str(uuid_module.uuid4())):
payload = [
"transcription",
{
"source": "ios",
"uuid": uuid,
"audio_data": {"num": 0, "_placeholder": True},
"ios_device_id": "GET_THIS_FROM_YOUR_OWN_NETWORK_REQUESTS",
"audio_format": "mp4",
"last": False,
"version": "2.9",
"language": "en"
}
]
return json.dumps(payload)
class AudioSegment:
def __init__(self, uuid):
self.uuid = uuid
self.audio_data = b''
self.alignment = None
audio_segments = {}
async def websocket_client():
uri = "wss://www.perplexity.ai/socket.io/?EIO=4&transport=websocket"
headers = {
"User-Agent": "Ask/2.26.1/5400 (iOS; iPhone; Version 17.5.1 (Build 21F90)) isiOSOnMac/false",
"Origin": "wss://www.perplexity.ai",
"Sec-WebSocket-Extensions": "permessage-deflate; client_max_window_bits",
'X-Client-Name': "Perplexity-iOS",
"X-App-Version": "2.26.1",
'X-App-ApiClient': 'ios',
'X-App-ApiVersion': 2.9
}
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
async with websockets.connect(uri, extra_headers=headers, ssl=ssl_context) as websocket:
print("Connected to WebSocket server")
while True:
message = await websocket.recv()
# print(f"Received: {message}")
if isinstance(message, bytes):
# Handle raw audio data
if audio_segments:
latest_uuid = list(audio_segments.keys())[-1]
audio_segments[latest_uuid].audio_data += message
continue
cleaned_message = re.sub(r'^\d+[^[{]*', '', message)
try:
data = json.loads(cleaned_message)
if isinstance(data, list) and data[0] == "audio":
audio_info = data[1]
uuid = audio_info["uuid"]
alignment = audio_info.get("alignment")
if uuid not in audio_segments:
audio_segments[uuid] = AudioSegment(uuid)
if alignment:
audio_segments[uuid].alignment = alignment
if audio_info.get("data") is not None:
if isinstance(audio_info["data"], dict):
# Handle the case where data is a dictionary
if "_placeholder" in audio_info["data"] and audio_info["data"]["_placeholder"]:
# Skip placeholder data
continue
audio_data = ''.join(str(value) for value in audio_info["data"].values()).encode('utf-8')
elif isinstance(audio_info["data"], str):
# Handle the case where data is a string
audio_data = audio_info["data"].encode('utf-8')
else:
print(f"Unexpected data format: {type(audio_info['data'])}")
continue
audio_segments[uuid].audio_data += audio_data
if audio_info.get("last", False):
print(f"Playing audio segment with UUID: {uuid}")
# This is the last piece of the audio segment, play it
play_audio_segment(audio_segments[uuid])
del audio_segments[uuid]
elif isinstance(data, list) and isinstance(data[0], dict) and "status" in data[0]:
# Handle transcription response
transcription_data = data[0]
if "success" in transcription_data and transcription_data["status"] == "completed" and transcription_data["success"]:
transcribed_text = transcription_data["text"]
print(f"Transcription: {transcribed_text}")
# Generate and send a query based on the transcribed text
request_uuid = str(uuid_module.uuid4())
ask_payload = create_ask_payload(transcribed_text, request_uuid)
voice_over_payload = create_voice_over_payload(request_uuid)
await websocket.send(f'4254{ask_payload}')
await websocket.send(f'4255{voice_over_payload}')
elif isinstance(data, dict) and set(data.keys()) == {"sid"}:
audio_data = record_audio()
transcription_payload = create_transcription_payload()
await websocket.send(f'451-50{transcription_payload}')
await websocket.send(audio_data)
print("sent audio")
except json.JSONDecodeError:
print(f"Failed to parse JSON message: {cleaned_message}")
if message.startswith('0'):
# PUT YOUR JWT HERE
payload = '40/,{"perplexity_jwt":""}'
await websocket.send(payload)
print("Sent initialization message")
def play_audio_segment(segment):
# Convert the audio data from bytes to 16-bit PCM samples
samples = struct.unpack('<' + 'h' * (len(segment.audio_data) // 2), segment.audio_data)
p = pyaudio.PyAudio()
# Open a stream
stream = p.open(format=pyaudio.paInt16,
channels=1,
rate=42000,
output=True)
# Play the audio
stream.write(segment.audio_data)
# Close the stream
stream.stop_stream()
stream.close()
p.terminate()
print(f"Played audio segment with UUID: {segment.uuid}")
# Print alignment data if available
if segment.alignment:
print(f"Alignment data for UUID {segment.uuid}:")
print(json.dumps(segment.alignment, indent=2))
if __name__ == "__main__":
asyncio.run(websocket_client())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment