Created
December 7, 2024 14:54
-
-
Save twobob/8167bf63e38450eff2b25dbd663dfc74 to your computer and use it in GitHub Desktop.
(Windows only RN) call upon a local llm for logical responses, scrape audio with whisper, respond with the Window TTS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Step 1: Import necessary libraries and modules | |
import warnings | |
import pyaudio | |
import wave | |
import whisper | |
import openai | |
import os | |
import numpy as np | |
import pyttsx3 | |
# Step 2: Initialize Text-to-Speech engine (Windows users only) | |
engine = pyttsx3.init() | |
hazel_voice_id = "HKEY_LOCAL_MACHINE\\SOFTWARE\\Microsoft\\Speech\\Voices\\Tokens\\TTS_MS_EN-GB_HAZEL_11.0" | |
engine.setProperty('voice', hazel_voice_id) | |
model_name = "meta-llama-3.1-8b-instruct" | |
# we use this to gracefully ignore audio hallucinations | |
stop_terms = [" God bless you.", | |
" Undertexter av Nicolai Winther", | |
" Takk for ating med.", | |
" Takk.", | |
" Teksting av Nicolai Winther", | |
" Thank you." , | |
"", | |
" .", | |
" Thank you for watching!", | |
" Thank you for watching."] | |
# Step 3: Define ANSI escape sequences for text color | |
colors = { | |
"blue": "\033[94m", | |
"bright_blue": "\033[96m", | |
"orange": "\033[93m", | |
"yellow": "\033[93m", | |
"white": "\033[97m", | |
"red": "\033[91m", | |
"magenta": "\033[35m", | |
"bright_magenta": "\033[95m", | |
"cyan": "\033[36m", | |
"bright_cyan": "\033[96m", | |
"green": "\033[32m", | |
"bright_green": "\033[92m", | |
"reset": "\033[0m" | |
} | |
# Step 4: Ignore FP16 warnings | |
warnings.filterwarnings("ignore", message="FP16 is not supported on CPU") | |
# Step 5: Point to LM Studio Local Inference Server | |
openai.api_base = "http://localhost:1234/v1" | |
openai.api_key = "not-needed" | |
# Step 6: Load the Whisper model | |
whisper_model = whisper.load_model("large") | |
# Step 7: Define audio parameters | |
FORMAT = pyaudio.paInt16 | |
CHANNELS = 1 | |
RATE = 8000 # Lower sample rate for faster processing | |
CHUNK = 1024 | |
THRESHOLD = 40 # RMS threshold to detect voice | |
SILENCE_DURATION = 3 # Duration of silence to stop recording (seconds) | |
audio = pyaudio.PyAudio() | |
# Step 8: Define function to calculate RMS | |
def calculate_rms(data): | |
numpy_data = np.frombuffer(data, dtype=np.int16) | |
if len(numpy_data) == 0: # Handle empty arrays | |
return 0 | |
rms = np.sqrt(np.mean(numpy_data ** 2)) if np.mean(numpy_data ** 2) > 0 else 0 | |
return rms | |
# Step 9: Define function to speak text | |
def speak(text): | |
engine.say(text) | |
engine.runAndWait() | |
def isSignalDetected(rms, THRESHOLD): | |
return rms < THRESHOLD | |
# Step 10: Define function to record audio with RMS threshold | |
def record_audio_with_threshold(): | |
stream = audio.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) | |
print(f"{colors['green']}Listening for audio...{colors['reset']}") | |
frames = [] | |
silent_chunks = 0 | |
consecutive_detected_chunks = 0 # Track consecutive chunks above threshold | |
recording = False | |
try: | |
while True: | |
data = stream.read(CHUNK, exception_on_overflow=False) | |
rms = calculate_rms(data) | |
if not isSignalDetected(rms, THRESHOLD): | |
consecutive_detected_chunks += 1 | |
if not recording and consecutive_detected_chunks >= 2: # Require two consecutive chunks above the threshold | |
print(f"{colors['yellow']}Sound detected, starting recording...{colors['reset']}") | |
recording = True | |
silent_chunks = 0 | |
if recording: | |
frames.append(data) | |
else: | |
consecutive_detected_chunks = 0 # Reset consecutive detected chunks if threshold is not met | |
if recording: | |
silent_chunks += 1 | |
max_chunks = int(SILENCE_DURATION * RATE / CHUNK) | |
if silent_chunks > max_chunks: | |
print(f"{colors['red']}Silence detected, stopping recording.{colors['reset']}") | |
break | |
frames.append(data) | |
# Save the recorded audio | |
stream.stop_stream() | |
stream.close() | |
audio_file = "temp_audio.wav" | |
wf = wave.open(audio_file, 'wb') | |
wf.setnchannels(CHANNELS) | |
wf.setsampwidth(audio.get_sample_size(FORMAT)) | |
wf.setframerate(RATE) | |
wf.writeframes(b''.join(frames)) | |
wf.close() | |
return audio_file | |
except Exception as e: | |
print(f"{colors['red']}Error during recording: {e}{colors['reset']}") | |
stream.stop_stream() | |
stream.close() | |
# Step 11: Define function to process user input and generate response | |
def process_input(input_text): | |
conversation = [ | |
{"role": "system", "content": "You are Absolom, the assistant chatbot. Respond concisely and accurately."}, | |
{"role": "user", "content": input_text} | |
] | |
completion = openai.ChatCompletion.create( | |
model=model_name, | |
messages=conversation, | |
temperature=0.8, | |
top_p=0.9, | |
top_k=40 | |
) | |
assistant_reply = completion.choices[0].message.content | |
print(f"{colors['magenta']}Absolom:{colors['reset']} {assistant_reply}") | |
speak(assistant_reply) | |
engine.say("Starting") | |
engine.runAndWait() | |
# Step 12: Main loop to continuously monitor for user input | |
print(f"{colors['yellow']}Ready to listen. Speak aloud to interact.{colors['reset']}") | |
while True: | |
try: | |
audio_file = record_audio_with_threshold() | |
if audio_file: | |
transcribe_result = whisper_model.transcribe(audio_file) | |
transcribed_text = transcribe_result["text"] | |
if transcribed_text not in stop_terms: | |
print(f"{colors['blue']}You:{colors['reset']} {transcribed_text}") | |
process_input(transcribed_text) | |
os.remove(audio_file) # Cleanup | |
except KeyboardInterrupt: | |
print("\nExiting...") | |
break | |
# Step 13: Cleanup audio resources | |
audio.terminate() |
openai==0.28 prolly
credit to videotronic for the original script
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
here we are calling upon LMstudio - adjust to your needs