Created
March 10, 2024 23:29
-
-
Save fareesh/1c5b0a7d7cd9d8fa824e400695ed5006 to your computer and use it in GitHub Desktop.
Poor Man's Siri
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
""" | |
A modified version of the "assistant" example from pywhispercpp | |
""" | |
import queue | |
import time | |
from typing import Callable | |
import numpy as np | |
import sounddevice as sd | |
import pywhispercpp.constants as constants | |
import logging | |
from pywhispercpp.model import Model | |
import webrtcvad | |
class Assistant: | |
def __init__( | |
self, | |
model="tiny", | |
input_device: int = None, | |
silence_threshold: int = 8, | |
q_threshold: int = 16, | |
block_duration: int = 30, | |
commands_callback: Callable[[str], None] = None, | |
model_log_level: int = logging.INFO, | |
**model_params, | |
): | |
""" | |
:param model: whisper.cpp model name or a direct path to a`ggml` model | |
:param input_device: The input device (aka microphone), keep it None to take the default | |
:param silence_threshold: The duration of silence after which the inference will be running | |
:param q_threshold: The inference won't be running until the data queue is having at least `q_threshold` elements | |
:param block_duration: minimum time audio updates in ms | |
:param commands_callback: The callback to run when a command is received | |
:param model_log_level: Logging level | |
:param model_params: any other parameter to pass to the whsiper.cpp model see ::: pywhispercpp.constants.PARAMS_SCHEMA | |
""" | |
self.running = True | |
self.input_device = input_device | |
self.sample_rate = constants.WHISPER_SAMPLE_RATE # same as whisper.cpp | |
self.channels = 1 # same as whisper.cpp | |
self.block_duration = block_duration | |
self.block_size = int(self.sample_rate * self.block_duration / 1000) | |
self.q = queue.Queue() | |
self.silence_threshold = silence_threshold | |
self.q_threshold = q_threshold | |
self._silence_counter = 0 | |
self.pwccp_model = Model( | |
model, | |
log_level=model_log_level, | |
print_realtime=False, | |
print_progress=False, | |
print_timestamps=False, | |
single_segment=True, | |
no_context=True, | |
**model_params, | |
) | |
self.commands_callback = commands_callback | |
self.vad = webrtcvad.Vad(1) | |
def _audio_callback(self, indata, frames, time, status): | |
""" | |
This is called (from a separate thread) for each audio block. | |
""" | |
if status: | |
logging.warning(f"underlying audio stack warning:{status}") | |
assert frames == self.block_size | |
audio_data = map( | |
lambda x: (x + 1) / 2, indata | |
) # normalize from [-1,+1] to [0,1] | |
audio_data = np.fromiter(audio_data, np.float16) | |
audio_data = audio_data.tobytes() | |
detection = self.vad.is_speech(audio_data, self.sample_rate) | |
if detection: | |
self._silence_counter = 0 | |
self.q.put(indata.copy()) | |
else: | |
if self._silence_counter >= self.silence_threshold: | |
if self.q.qsize() > self.q_threshold: | |
self._transcribe_speech() | |
self._silence_counter = 0 | |
else: | |
self._silence_counter += 1 | |
def _transcribe_speech(self): | |
logging.info("Speech detected ...") | |
audio_data = np.array([]) | |
while self.q.qsize() > 0: | |
# get all the data from the q | |
audio_data = np.append(audio_data, self.q.get()) | |
# Appending zeros to the audio data as a workaround for small audio packets (small commands) | |
audio_data = np.concatenate( | |
[audio_data, np.zeros((int(self.sample_rate) + 10))] | |
) | |
# running the inference | |
res = self.pwccp_model.transcribe(audio_data) | |
self._new_segment_callback(res) | |
def _new_segment_callback(self, seg): | |
if self.commands_callback: | |
self.commands_callback(seg[0].text) | |
def stop(self) -> None: | |
print("Assistant stopped") | |
self.running = False | |
def start(self) -> None: | |
""" | |
Use this function to start the assistant | |
:return: None | |
""" | |
logging.info(f"Starting Assistant ...") | |
self.running = True | |
with sd.InputStream( | |
device=self.input_device, # the default input device | |
channels=self.channels, | |
samplerate=constants.WHISPER_SAMPLE_RATE, | |
blocksize=self.block_size, | |
callback=self._audio_callback, | |
): | |
try: | |
logging.info(f"Assistant is listening ... (CTRL+C to stop)") | |
while self.running is True: | |
time.sleep(0.1) | |
except KeyboardInterrupt: | |
logging.info("Assistant stopped") | |
@staticmethod | |
def available_devices(): | |
return sd.query_devices() | |
def _main(): | |
my_assistant = Assistant( | |
model=args.model, | |
input_device=args.input_device, | |
silence_threshold=args.silence_threshold, | |
block_duration=args.block_duration, | |
commands_callback=print, | |
) | |
my_assistant.start() | |
if __name__ == "__main__": | |
_main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import sys | |
# CLI args are: TEXT, TEXT_CLIPBOARD, AUDIO, AUDIO_CLIPBOARD for the 4 modes | |
def trigger_action(): | |
PORT = 9876 | |
try: | |
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
client_socket.connect(("localhost", PORT)) | |
client_socket.send(sys.argv[1].encode("utf-8")) | |
print("Trigger sent.") | |
client_socket.close() | |
except ConnectionRefusedError: | |
print("Failed to connect to the server. Make sure the server is running.") | |
if __name__ == "__main__": | |
trigger_action() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Imports | |
import socket | |
import threading | |
import uuid | |
import queue | |
import tkinter as tk | |
import pyperclip | |
from playsound import playsound | |
from time import sleep | |
import customtkinter | |
import google.generativeai as genai | |
from dimits import Dimits | |
from assistant import Assistant | |
# Configuration and Global Variables | |
GOOGLE_API_KEY = "" #Fill this out | |
AUDIO_FOLDER = "" #Fill this out | |
genai.configure(api_key=GOOGLE_API_KEY) | |
# Queues for processing | |
to_be_sentenced = queue.Queue() | |
to_be_generated = queue.Queue() | |
to_be_spoken = queue.Queue() | |
# State flags | |
is_listening = True | |
is_processing = False | |
is_speaking = False | |
is_generating = False | |
is_window_open = False | |
is_working = False | |
clipboard = False | |
# Debug flag and mode | |
DEBUG = False | |
mode = "TEXT" | |
text_area = None | |
# Debug Function | |
def debug(text): | |
if DEBUG: | |
print(text) | |
# GUI Functionality | |
def launch_gui_thread(): | |
global text_area | |
root = customtkinter.CTk() | |
root.title("LLM Response") | |
root.geometry("960x480") | |
text_area = customtkinter.CTkTextbox( | |
root, | |
wrap=tk.WORD, | |
height=360, | |
width=920, | |
font=customtkinter.CTkFont(size=24, family="Helvetica"), | |
) | |
text_area.pack(padx=10, pady=10) | |
root.mainloop() | |
text_area = None | |
# Text Processing and Speech Functions | |
def append_text(sentence): | |
global text_area | |
if text_area is not None: | |
debug(f"Adding sentence to textarea: {sentence}") | |
text_area.insert(tk.END, sentence) | |
text_area.update() | |
else: | |
debug("Textarea is none, not adding anything") | |
def generate_voice(text): | |
global is_generating | |
is_generating = True | |
debug(f"Generating speech for: {text}") | |
dt = Dimits("voice-en-us-danny-low") | |
random_uuid = uuid.uuid4() | |
filename = str(random_uuid) | |
to_be_spoken.put(filename) | |
dt.text_2_audio_file( | |
text, filename, AUDIO_FOLDER, format="wav" | |
) | |
is_generating = False | |
threading.Thread(target=speak_next).start() | |
def speak_next(): | |
global is_speaking | |
if not is_speaking and not to_be_spoken.empty(): | |
next_filename = to_be_spoken.get() | |
if next_filename: | |
next_filename = f"{AUDIO_FOLDER}/{next_filename}.wav" | |
debug(f"Playing audio: {next_filename}") | |
is_speaking = True | |
playsound(next_filename) | |
is_speaking = False | |
speak_next() | |
# LLM Text Processing | |
def llm_process_text(text): | |
global genai, text_area | |
print(f"Sending: {text}") | |
try: | |
model = genai.GenerativeModel("gemini-pro") | |
if mode == "AUDIO": | |
prompt = f"You will receive input spoken aloud be a user. Respond with text that should be spoken aloud to the user via TTS. Don't use bulleted lists or asterisks in your response. Keep your response brief and concise. Share your response to the following prompt:\n{text}" | |
else: | |
prompt = text | |
if clipboard is True: | |
clipboard_contents = pyperclip.paste() | |
prompt = f"{prompt}\n\n{clipboard_contents}" | |
response = model.generate_content( | |
prompt, | |
stream=True, | |
) | |
except Exception: | |
print("Gemini Error") | |
return | |
if text_area is None and mode == "TEXT": | |
gui_thread = threading.Thread(target=launch_gui_thread) | |
gui_thread.start() | |
# Can probably remove this | |
sleep(0.5) | |
for chunk in response: | |
try: | |
if ( | |
not to_be_generated.empty() | |
and is_generating is False | |
and mode == "AUDIO" | |
): | |
debug(f"Sending: {text} to PIPER") | |
text = to_be_generated.get() | |
threading.Thread(target=generate_voice, args=[text]).start() | |
if chunk.text is not None: | |
if mode == "TEXT": | |
debug(chunk.text) | |
debug("Appending") | |
debug("\n") | |
append_text(chunk.text) | |
if mode == "AUDIO": | |
blank_audio = "[BLANK_AUDIO]" | |
response_text = chunk.text.replace(blank_audio, "") | |
debug(response_text) | |
words = response_text.split(" ") | |
for word in words: | |
to_be_sentenced.put(word) | |
if ( | |
word.strip().endswith(".") | |
or word.strip().endswith("\n") | |
or word.strip().endswith("?") | |
or word.strip().endswith("!") | |
): | |
debug("Queueing sentence") | |
items = [] | |
while not to_be_sentenced.empty(): | |
items.append(to_be_sentenced.get()) | |
text = " ".join(items) | |
if len(text) > 0: | |
to_be_generated.put(text) | |
if is_generating is False: | |
debug(f"Sending: {text} to PIPER") | |
text = to_be_generated.get() | |
threading.Thread( | |
target=generate_voice, args=[text] | |
).start() | |
except Exception as _: | |
# Figure out what to do when this is encountered | |
pass | |
debug("Received all chunks") | |
if mode == "AUDIO": | |
items = [] | |
while not to_be_sentenced.empty(): | |
items.append(to_be_sentenced.get()) | |
text = " ".join(items) | |
if len(text) > 0: | |
to_be_generated.put(text) | |
while not to_be_generated.empty(): | |
if is_generating is False: | |
debug(f"Sending: {text} to PIPER") | |
text = to_be_generated.get() | |
if len(text) > 0: | |
threading.Thread(target=generate_voice, args=[text]).start() | |
global is_working | |
is_working = False | |
# Speech and Server Functions | |
def process_speech(text): | |
global my_assistant | |
debug(f"Got Text: {text}") | |
if my_assistant.running: | |
my_assistant.stop() | |
playsound("searching.wav") | |
llm_process_text(text) | |
def listen_and_process(): | |
global my_assistant | |
my_assistant.start() | |
def handle_client_connection(client_socket): | |
global is_working, mode, clipboard | |
data = client_socket.recv(1024) | |
if not is_working: | |
is_working = True | |
message = data.decode("utf-8") | |
if "_CLIPBOARD" in message: | |
mode = message.split("_")[0] | |
clipboard = True | |
else: | |
mode = message | |
clipboard = False | |
playsound("listening.wav") | |
listen_and_process() | |
client_socket.close() | |
def start_server(): | |
PORT = 9876 | |
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
server.bind(("localhost", PORT)) | |
server.listen(5) | |
print("Waiting for Trigger:") | |
while True: | |
client_sock, address = server.accept() | |
debug(f"Accepted connection from {address[0]}:{address[1]}") | |
client_handler = threading.Thread( | |
target=handle_client_connection, args=(client_sock,) | |
) | |
client_handler.start() | |
# Assistant Initialization | |
debug("Launching Assistant...") | |
my_assistant = Assistant( | |
commands_callback=process_speech, | |
input_device=8, | |
silence_threshold=2, | |
block_duration=20, | |
n_threads=8, | |
model="base.en", | |
) | |
# Main Function | |
if __name__ == "__main__": | |
start_server() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment