Created
April 9, 2025 14:46
-
-
Save lordlinus/fa7d703ff02ac7a38715b9e421f68a9d to your computer and use it in GitHub Desktop.
Voice model to output JSON
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import streamlit as st | |
import base64 | |
import os | |
import json | |
from openai import AzureOpenAI | |
from dotenv import load_dotenv | |
from datetime import date, datetime | |
from typing import Optional | |
from pydantic import BaseModel, Field | |
import tempfile | |
import io | |
import time | |
# Load environment variables | |
load_dotenv(override=True) | |
# Define models (copied from test.py) | |
class CurrencyModel(BaseModel): | |
code: Optional[str] = Field(default="SGD") | |
name: Optional[str] = Field(default="Singapore Dollar") | |
symbol: Optional[str] = Field(default="$") | |
class ReceiptModel(BaseModel): | |
number: Optional[str] = Field(default="R12345") | |
receiptDate: Optional[date] = Field(default=date.today()) | |
admissionDate: Optional[date] = Field(default=date.today()) | |
dischargeDate: Optional[date] = Field(default=date.today()) | |
hospitalName: Optional[str] = Field(default="KK Hospital") | |
currency: CurrencyModel = Field(default_factory=CurrencyModel) | |
amount: Optional[float] = Field(default=0.0) | |
# Custom JSON encoder to handle date objects | |
class DateEncoder(json.JSONEncoder): | |
def default(self, obj): | |
if isinstance(obj, date): | |
return obj.isoformat() | |
return super().default(obj) | |
def generate_text_from_audio(client, audio_data): | |
# Encode audio data | |
encoded_string = base64.b64encode(audio_data).decode("utf-8") | |
# Create a receipt object for the initial state | |
receipt_obj = ReceiptModel( | |
number="R12345", | |
receiptDate=date.today(), | |
admissionDate=date.today(), | |
dischargeDate=date.today(), | |
hospitalName="City Hospital", | |
currency=CurrencyModel(code="SGD", name="Singapore Dollar", symbol="$"), | |
amount=1234.56, | |
) | |
try: | |
# Make the audio chat completions request | |
completion = client.chat.completions.create( | |
model="gpt-4o-audio-preview", | |
modalities=["text"], | |
messages=[ | |
{ | |
"role": "user", | |
"content": [ | |
{ | |
"type": "text", | |
"text": f"You need to output JSON and nothing else. User is requesting to change certain fields in the JSON schema and you need to change and output the entire JSON and nothing else. Schema is {receipt_obj.model_dump_json()}. If you output anything else other than JSON the downstream will break. Please output JSON only.", | |
}, | |
{ | |
"type": "input_audio", | |
"input_audio": {"data": encoded_string, "format": "wav"}, | |
}, | |
], | |
} | |
], | |
) | |
# Return both the original and updated JSON | |
return receipt_obj.model_dump_json(), completion.choices[0].message.content | |
except Exception as e: | |
st.error(f"Error calling Azure OpenAI endpoint: {e}") | |
return receipt_obj.model_dump_json(), None | |
def main(): | |
st.set_page_config(page_title="Voice to JSON Modifier", layout="wide") | |
st.title("Voice to JSON Modifier") | |
st.write( | |
"Speak to modify the JSON fields. You can say things like 'Change hospital name to Mount Elizabeth' or 'Set amount to 5000'" | |
) | |
# Initialize OpenAI client | |
client = AzureOpenAI( | |
api_key=os.getenv("AZURE_OPENAI_API_KEY"), | |
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"), | |
api_version="2025-01-01-preview", | |
azure_deployment=os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME"), | |
) | |
# Create columns for before/after display | |
col1, col2 = st.columns(2) | |
# Instructions for voice recording | |
st.subheader("Voice Input") | |
st.write("Click the microphone button below and speak your command clearly.") | |
# Use the audio_input function for browser-based recording | |
audio_data = st.audio_input("Record your voice command:") | |
# Initialize session state for storing processed results | |
if "original_json" not in st.session_state: | |
st.session_state.original_json = None | |
if "updated_json" not in st.session_state: | |
st.session_state.updated_json = None | |
# Create a button to process the audio | |
process_button = st.button("Process Audio") | |
# Show recording status | |
if audio_data is not None: | |
st.success("Voice recorded successfully!") | |
st.audio(audio_data, format="audio/wav") | |
# Process the audio when button is clicked | |
if process_button: | |
if audio_data is None: | |
st.warning("Please record your voice first") | |
else: | |
with st.spinner("Processing your voice command..."): | |
# Process the audio | |
st.session_state.original_json, st.session_state.updated_json = ( | |
generate_text_from_audio(client, audio_data.getbuffer()) | |
) | |
# Display results if available | |
if st.session_state.original_json and st.session_state.updated_json: | |
# Display the original JSON | |
with col1: | |
st.subheader("Original JSON") | |
st.json(st.session_state.original_json) | |
# Display the updated JSON | |
with col2: | |
st.subheader("Updated JSON (After Voice Command)") | |
st.json(st.session_state.updated_json) | |
# Show what changed | |
st.subheader("Changes Made") | |
try: | |
original_dict = json.loads(st.session_state.original_json) | |
updated_dict = json.loads(st.session_state.updated_json) | |
changes = [] | |
for key in original_dict: | |
if key in updated_dict: | |
if isinstance(original_dict[key], dict) and isinstance( | |
updated_dict[key], dict | |
): | |
# Handle nested objects like currency | |
for subkey in original_dict[key]: | |
if ( | |
subkey in updated_dict[key] | |
and original_dict[key][subkey] | |
!= updated_dict[key][subkey] | |
): | |
changes.append( | |
f"- {key}.{subkey}: '{original_dict[key][subkey]}' → '{updated_dict[key][subkey]}'" | |
) | |
elif original_dict[key] != updated_dict[key]: | |
changes.append( | |
f"- {key}: '{original_dict[key]}' → '{updated_dict[key]}'" | |
) | |
if changes: | |
for change in changes: | |
st.write(change) | |
else: | |
st.write("No changes detected") | |
except json.JSONDecodeError: | |
st.error("Error parsing the updated JSON") | |
st.write("Raw response:", st.session_state.updated_json) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment