Skip to content

Instantly share code, notes, and snippets.

@fareesh
Created March 10, 2024 23:29
Show Gist options
  • Save fareesh/1c5b0a7d7cd9d8fa824e400695ed5006 to your computer and use it in GitHub Desktop.
Save fareesh/1c5b0a7d7cd9d8fa824e400695ed5006 to your computer and use it in GitHub Desktop.
Poor Man's Siri
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
A modified version of the "assistant" example from pywhispercpp
"""
import queue
import time
from typing import Callable
import numpy as np
import sounddevice as sd
import pywhispercpp.constants as constants
import logging
from pywhispercpp.model import Model
import webrtcvad
class Assistant:
def __init__(
self,
model="tiny",
input_device: int = None,
silence_threshold: int = 8,
q_threshold: int = 16,
block_duration: int = 30,
commands_callback: Callable[[str], None] = None,
model_log_level: int = logging.INFO,
**model_params,
):
"""
:param model: whisper.cpp model name or a direct path to a`ggml` model
:param input_device: The input device (aka microphone), keep it None to take the default
:param silence_threshold: The duration of silence after which the inference will be running
:param q_threshold: The inference won't be running until the data queue is having at least `q_threshold` elements
:param block_duration: minimum time audio updates in ms
:param commands_callback: The callback to run when a command is received
:param model_log_level: Logging level
:param model_params: any other parameter to pass to the whsiper.cpp model see ::: pywhispercpp.constants.PARAMS_SCHEMA
"""
self.running = True
self.input_device = input_device
self.sample_rate = constants.WHISPER_SAMPLE_RATE # same as whisper.cpp
self.channels = 1 # same as whisper.cpp
self.block_duration = block_duration
self.block_size = int(self.sample_rate * self.block_duration / 1000)
self.q = queue.Queue()
self.silence_threshold = silence_threshold
self.q_threshold = q_threshold
self._silence_counter = 0
self.pwccp_model = Model(
model,
log_level=model_log_level,
print_realtime=False,
print_progress=False,
print_timestamps=False,
single_segment=True,
no_context=True,
**model_params,
)
self.commands_callback = commands_callback
self.vad = webrtcvad.Vad(1)
def _audio_callback(self, indata, frames, time, status):
"""
This is called (from a separate thread) for each audio block.
"""
if status:
logging.warning(f"underlying audio stack warning:{status}")
assert frames == self.block_size
audio_data = map(
lambda x: (x + 1) / 2, indata
) # normalize from [-1,+1] to [0,1]
audio_data = np.fromiter(audio_data, np.float16)
audio_data = audio_data.tobytes()
detection = self.vad.is_speech(audio_data, self.sample_rate)
if detection:
self._silence_counter = 0
self.q.put(indata.copy())
else:
if self._silence_counter >= self.silence_threshold:
if self.q.qsize() > self.q_threshold:
self._transcribe_speech()
self._silence_counter = 0
else:
self._silence_counter += 1
def _transcribe_speech(self):
logging.info("Speech detected ...")
audio_data = np.array([])
while self.q.qsize() > 0:
# get all the data from the q
audio_data = np.append(audio_data, self.q.get())
# Appending zeros to the audio data as a workaround for small audio packets (small commands)
audio_data = np.concatenate(
[audio_data, np.zeros((int(self.sample_rate) + 10))]
)
# running the inference
res = self.pwccp_model.transcribe(audio_data)
self._new_segment_callback(res)
def _new_segment_callback(self, seg):
if self.commands_callback:
self.commands_callback(seg[0].text)
def stop(self) -> None:
print("Assistant stopped")
self.running = False
def start(self) -> None:
"""
Use this function to start the assistant
:return: None
"""
logging.info(f"Starting Assistant ...")
self.running = True
with sd.InputStream(
device=self.input_device, # the default input device
channels=self.channels,
samplerate=constants.WHISPER_SAMPLE_RATE,
blocksize=self.block_size,
callback=self._audio_callback,
):
try:
logging.info(f"Assistant is listening ... (CTRL+C to stop)")
while self.running is True:
time.sleep(0.1)
except KeyboardInterrupt:
logging.info("Assistant stopped")
@staticmethod
def available_devices():
return sd.query_devices()
def _main():
my_assistant = Assistant(
model=args.model,
input_device=args.input_device,
silence_threshold=args.silence_threshold,
block_duration=args.block_duration,
commands_callback=print,
)
my_assistant.start()
if __name__ == "__main__":
_main()
import socket
import sys
# CLI args are: TEXT, TEXT_CLIPBOARD, AUDIO, AUDIO_CLIPBOARD for the 4 modes
def trigger_action():
PORT = 9876
try:
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect(("localhost", PORT))
client_socket.send(sys.argv[1].encode("utf-8"))
print("Trigger sent.")
client_socket.close()
except ConnectionRefusedError:
print("Failed to connect to the server. Make sure the server is running.")
if __name__ == "__main__":
trigger_action()
# Imports
import socket
import threading
import uuid
import queue
import tkinter as tk
import pyperclip
from playsound import playsound
from time import sleep
import customtkinter
import google.generativeai as genai
from dimits import Dimits
from assistant import Assistant
# Configuration and Global Variables
GOOGLE_API_KEY = "" #Fill this out
AUDIO_FOLDER = "" #Fill this out
genai.configure(api_key=GOOGLE_API_KEY)
# Queues for processing
to_be_sentenced = queue.Queue()
to_be_generated = queue.Queue()
to_be_spoken = queue.Queue()
# State flags
is_listening = True
is_processing = False
is_speaking = False
is_generating = False
is_window_open = False
is_working = False
clipboard = False
# Debug flag and mode
DEBUG = False
mode = "TEXT"
text_area = None
# Debug Function
def debug(text):
if DEBUG:
print(text)
# GUI Functionality
def launch_gui_thread():
global text_area
root = customtkinter.CTk()
root.title("LLM Response")
root.geometry("960x480")
text_area = customtkinter.CTkTextbox(
root,
wrap=tk.WORD,
height=360,
width=920,
font=customtkinter.CTkFont(size=24, family="Helvetica"),
)
text_area.pack(padx=10, pady=10)
root.mainloop()
text_area = None
# Text Processing and Speech Functions
def append_text(sentence):
global text_area
if text_area is not None:
debug(f"Adding sentence to textarea: {sentence}")
text_area.insert(tk.END, sentence)
text_area.update()
else:
debug("Textarea is none, not adding anything")
def generate_voice(text):
global is_generating
is_generating = True
debug(f"Generating speech for: {text}")
dt = Dimits("voice-en-us-danny-low")
random_uuid = uuid.uuid4()
filename = str(random_uuid)
to_be_spoken.put(filename)
dt.text_2_audio_file(
text, filename, AUDIO_FOLDER, format="wav"
)
is_generating = False
threading.Thread(target=speak_next).start()
def speak_next():
global is_speaking
if not is_speaking and not to_be_spoken.empty():
next_filename = to_be_spoken.get()
if next_filename:
next_filename = f"{AUDIO_FOLDER}/{next_filename}.wav"
debug(f"Playing audio: {next_filename}")
is_speaking = True
playsound(next_filename)
is_speaking = False
speak_next()
# LLM Text Processing
def llm_process_text(text):
global genai, text_area
print(f"Sending: {text}")
try:
model = genai.GenerativeModel("gemini-pro")
if mode == "AUDIO":
prompt = f"You will receive input spoken aloud be a user. Respond with text that should be spoken aloud to the user via TTS. Don't use bulleted lists or asterisks in your response. Keep your response brief and concise. Share your response to the following prompt:\n{text}"
else:
prompt = text
if clipboard is True:
clipboard_contents = pyperclip.paste()
prompt = f"{prompt}\n\n{clipboard_contents}"
response = model.generate_content(
prompt,
stream=True,
)
except Exception:
print("Gemini Error")
return
if text_area is None and mode == "TEXT":
gui_thread = threading.Thread(target=launch_gui_thread)
gui_thread.start()
# Can probably remove this
sleep(0.5)
for chunk in response:
try:
if (
not to_be_generated.empty()
and is_generating is False
and mode == "AUDIO"
):
debug(f"Sending: {text} to PIPER")
text = to_be_generated.get()
threading.Thread(target=generate_voice, args=[text]).start()
if chunk.text is not None:
if mode == "TEXT":
debug(chunk.text)
debug("Appending")
debug("\n")
append_text(chunk.text)
if mode == "AUDIO":
blank_audio = "[BLANK_AUDIO]"
response_text = chunk.text.replace(blank_audio, "")
debug(response_text)
words = response_text.split(" ")
for word in words:
to_be_sentenced.put(word)
if (
word.strip().endswith(".")
or word.strip().endswith("\n")
or word.strip().endswith("?")
or word.strip().endswith("!")
):
debug("Queueing sentence")
items = []
while not to_be_sentenced.empty():
items.append(to_be_sentenced.get())
text = " ".join(items)
if len(text) > 0:
to_be_generated.put(text)
if is_generating is False:
debug(f"Sending: {text} to PIPER")
text = to_be_generated.get()
threading.Thread(
target=generate_voice, args=[text]
).start()
except Exception as _:
# Figure out what to do when this is encountered
pass
debug("Received all chunks")
if mode == "AUDIO":
items = []
while not to_be_sentenced.empty():
items.append(to_be_sentenced.get())
text = " ".join(items)
if len(text) > 0:
to_be_generated.put(text)
while not to_be_generated.empty():
if is_generating is False:
debug(f"Sending: {text} to PIPER")
text = to_be_generated.get()
if len(text) > 0:
threading.Thread(target=generate_voice, args=[text]).start()
global is_working
is_working = False
# Speech and Server Functions
def process_speech(text):
global my_assistant
debug(f"Got Text: {text}")
if my_assistant.running:
my_assistant.stop()
playsound("searching.wav")
llm_process_text(text)
def listen_and_process():
global my_assistant
my_assistant.start()
def handle_client_connection(client_socket):
global is_working, mode, clipboard
data = client_socket.recv(1024)
if not is_working:
is_working = True
message = data.decode("utf-8")
if "_CLIPBOARD" in message:
mode = message.split("_")[0]
clipboard = True
else:
mode = message
clipboard = False
playsound("listening.wav")
listen_and_process()
client_socket.close()
def start_server():
PORT = 9876
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("localhost", PORT))
server.listen(5)
print("Waiting for Trigger:")
while True:
client_sock, address = server.accept()
debug(f"Accepted connection from {address[0]}:{address[1]}")
client_handler = threading.Thread(
target=handle_client_connection, args=(client_sock,)
)
client_handler.start()
# Assistant Initialization
debug("Launching Assistant...")
my_assistant = Assistant(
commands_callback=process_speech,
input_device=8,
silence_threshold=2,
block_duration=20,
n_threads=8,
model="base.en",
)
# Main Function
if __name__ == "__main__":
start_server()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment