Last active
October 1, 2024 20:43
-
-
Save RohanAwhad/c87bd8777d9df1a68fe69ec7c49dbaac to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import cv2 | |
import os | |
import openai | |
import whisper | |
import pyaudio | |
import wave | |
import uuid | |
import threading | |
from queue import Queue | |
from pydub import AudioSegment | |
from pydub.playback import play | |
from pydub.silence import split_on_silence | |
# Initialize the Whisper model | |
model = whisper.load_model("base") | |
# Variables for recording | |
audio_buffer = [] | |
sample_rate = 16000 | |
device_index = 1 | |
SHOULD_LISTEN = False | |
START_KEYWORD = 'hello jarvis' | |
STOP_KEYWORD = 'jarvis out' | |
# Function to record audio continuously | |
def record_audio() -> None: | |
""" | |
Record audio from the microphone and save it to a file. | |
Parameters: | |
filename (str): The name of the file where the audio will be saved. | |
""" | |
CHUNK = 512 | |
FORMAT = pyaudio.paInt16 | |
CHANNELS = 1 | |
RATE = 44100 | |
RECORD_SECS = 5 | |
p = pyaudio.PyAudio() | |
stream = p.open(format=FORMAT, | |
channels=CHANNELS, | |
rate=RATE, | |
input=True, | |
input_device_index=1, | |
frames_per_buffer=CHUNK) | |
frames = [] | |
threads = [] | |
record_width = int(RATE * RECORD_SECS / CHUNK) | |
transcription_queue = Queue() | |
print("Recording audio...") | |
while True: | |
try: | |
data = stream.read(CHUNK) | |
frames.append(data) | |
if len(frames) >= record_width: | |
new_thread = threading.Thread(target=save_and_transcribe, args=( | |
frames, CHANNELS, RATE, p.get_sample_size(FORMAT), transcription_queue,)) | |
new_thread.start() | |
threads.append(new_thread) | |
frames = [] | |
except KeyboardInterrupt: | |
break | |
print("Finished recording.") | |
for th in threads: | |
th.join() | |
transcriptions = [] | |
while not transcription_queue.empty(): | |
transcriptions.append(transcription_queue.get()) | |
final_transcription = ' '.join(transcriptions) | |
stream.stop_stream() | |
stream.close() | |
p.terminate() | |
print('Final transcription:') | |
print(final_transcription) | |
def save_and_transcribe(frames: list[bytes], CHANNELS: int, RATE: int, SAMPLE_WIDTH: int, output_queue: Queue) -> str: | |
global SHOULD_LISTEN | |
try: | |
filename = f'/tmp/{uuid.uuid4()}.wav' | |
wf = wave.open(filename, 'wb') | |
wf.setnchannels(CHANNELS) | |
wf.setsampwidth(SAMPLE_WIDTH) | |
wf.setframerate(RATE) | |
wf.writeframes(b''.join(frames)) | |
wf.close() | |
ret = transcribe_audio(filename) | |
if START_KEYWORD in ret.lower(): | |
SHOULD_LISTEN = True | |
except Exception as e: | |
print(e) | |
ret = '' | |
finally: | |
if SHOULD_LISTEN: | |
output_queue.put(ret) | |
if STOP_KEYWORD in ret.lower(): | |
SHOULD_LISTEN = False | |
# trigger take a screenshot and send an openai request | |
transcriptions = [] | |
while not output_queue.empty(): | |
transcriptions.append(output_queue.get()) | |
user_message = ' '.join(transcriptions) | |
user_message = user_message[user_message.lower().find(START_KEYWORD) + len(START_KEYWORD):] | |
user_message = user_message[:user_message.lower().find(STOP_KEYWORD)] | |
call_ai(user_message) | |
# Function to transcribe audio using Whisper | |
def transcribe_audio(audio_file: str) -> str: | |
""" | |
Transcribe audio using Whisper and copy the transcription to clipboard. | |
Parameters: | |
audio_file (str): The path to the audio file to be transcribed. | |
""" | |
# Use Whisper to transcribe the audio | |
if is_speech_in_audio(audio_file): | |
result = model.transcribe(audio_file) | |
transcription = result['text'].strip() | |
print("Transcription:", transcription) | |
return transcription | |
else: | |
return '' | |
# Function to check if there is any speech in the audio file, before transcribing it | |
def is_speech_in_audio(audio_file: str) -> bool: | |
""" | |
Check if there is any speech in the audio file. | |
Parameters: | |
audio_file (str): The path to the audio file to be checked. | |
Returns: | |
bool: True if there is speech in the audio file, False otherwise. | |
""" | |
# Load the audio file | |
audio = AudioSegment.from_wav(audio_file) | |
# Split the audio on silence | |
chunks = split_on_silence(audio, min_silence_len=500, silence_thresh=audio.dBFS - 16) | |
# Check if there are any chunks with speech | |
for chunk in chunks: | |
if chunk.duration_seconds > 0.5: | |
return True | |
return False | |
# Function to take a screenshot to of the current screen and call openai gpt-4o-2024-08-06 model with the user message and the screenshot | |
def call_ai(user_message: str) -> None: | |
""" | |
Take a screenshot of the current screen and call OpenAI GPT-4o-2024-08-06 model with the user message and the screenshot. | |
Parameters: | |
user_message (str): The user's message to be sent to the AI model. | |
""" | |
# Take a screenshot of the current screen | |
screenshot_path = take_screenshot() | |
image = cv2.imread(screenshot_path) | |
_, buffer = cv2.imencode('.png', image) | |
image_base64 = base64.b64encode(buffer).decode('utf-8') | |
# Call OpenAI GPT-4o-2024-08-06 model with the user message and the screenshot | |
client = openai.OpenAI(api_key=os.environ['OPENAI_API_KEY']) | |
messages = [ | |
{ | |
'role': 'system', | |
'content': "You are a language model. Your job is to help user out. Keep it short enough, but not too short. They will provide you with a screenshot of what they are viewing and also text regarding what they want from you. Be funny and you're not just a question answering bot. You should act like a friend. Although you're a language model, try to be funny and enjoy yourself by making sure that the user enjoys themselves." | |
}, | |
{ | |
'role': 'user', | |
'content': [ | |
{ | |
'type': 'image_url', | |
'image_url': { | |
'url': f"data:image/png;base64,{image_base64}" | |
} | |
}, | |
{ | |
'type': 'text', | |
'text': user_message | |
} | |
] | |
} | |
] | |
print('Calling GPT ...') | |
llm_response = client.chat.completions.create( | |
model='gpt-4o-2024-08-06', | |
messages=messages, | |
temperature=0.8, | |
max_tokens=4096, | |
) | |
response = llm_response.choices[0].message.content | |
# Print the AI's response | |
print("AI's response:", response) | |
speech_file_path = "./speech.wav" | |
response = client.audio.speech.create( | |
model="tts-1", | |
voice="onyx", | |
input=response, | |
response_format='wav', | |
) | |
response.stream_to_file(speech_file_path) | |
# play the audio | |
audio = AudioSegment.from_wav(speech_file_path) | |
play(audio) | |
# Function to take a screenshot of the current screen | |
def take_screenshot() -> str: | |
""" | |
Take a screenshot of the current screen and save it to a file. | |
Returns: | |
str: The path to the saved screenshot file. | |
""" | |
screenshot_path = 'screenshot.png' | |
os.system(f'screencapture -x {screenshot_path}') | |
return screenshot_path | |
if __name__ == '__main__': | |
record_audio() | |
print("Exiting program.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment