Created
December 10, 2023 17:13
-
-
Save scottire/a25c73d5bd5bba25f0e811c117cc1975 to your computer and use it in GitHub Desktop.
VAD and record audio
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pyaudio | |
import wave | |
import requests | |
import os | |
import sys | |
import webrtcvad | |
def record_audio(filename, duration=5): | |
vad = webrtcvad.Vad(1) # Set aggressiveness from 0 to 3 | |
sample_rate = 16000 # Sample rate suitable for VAD | |
chunk_duration_ms = 30 # Each read length in milliseconds | |
padding_duration_ms = 1500 # Amount of silence to treat as end of phrase | |
frame_size = int(sample_rate * chunk_duration_ms / 1000) # Size of each read | |
num_padding_frames = int(padding_duration_ms / chunk_duration_ms) | |
p = pyaudio.PyAudio() | |
stream = p.open(format=pyaudio.paInt16, | |
channels=1, | |
rate=sample_rate, | |
input=True, | |
frames_per_buffer=frame_size) | |
frames = [] | |
triggered = False | |
voiced_frames = [] | |
silent_count = 0 | |
print("Start speaking.") | |
while True: | |
frame = stream.read(frame_size) | |
is_speech = vad.is_speech(frame, sample_rate) | |
if not triggered: | |
if is_speech: | |
triggered = True | |
voiced_frames.append(frame) | |
print("Recording...") | |
else: | |
frames.append(frame) | |
else: | |
voiced_frames.append(frame) | |
if not is_speech: | |
silent_count += 1 | |
if silent_count > num_padding_frames: | |
break | |
else: | |
silent_count = 0 | |
print("Finished recording.") | |
stream.stop_stream() | |
stream.close() | |
p.terminate() | |
wf = wave.open(filename, 'wb') | |
wf.setnchannels(1) | |
wf.setsampwidth(p.get_sample_size(pyaudio.paInt16)) | |
wf.setframerate(sample_rate) | |
wf.writeframes(b''.join(voiced_frames)) | |
wf.close() | |
def send_audio(filename): | |
url = 'http://127.0.0.1:5000/chat' | |
files = {'file': open(filename, 'rb')} | |
response = requests.post(url, files=files) | |
with open('response.mp3', 'wb') as f: | |
f.write(response.content) | |
def play_audio(filename): | |
os.system(f'afplay {filename}') | |
def parse_arguments(): | |
parser = argparse.ArgumentParser(description='CLI tool for practicing Spanish.') | |
parser.add_argument('--test', action='store_true', help='Run in test mode using a predefined audio file.') | |
return parser.parse_args() | |
if __name__ == '__main__': | |
import argparse | |
args = parse_arguments() | |
if args.test: | |
audio_filename = 'temp.wav' | |
if not os.path.exists(audio_filename): | |
print(f"Test file {audio_filename} not found.") | |
sys.exit(1) | |
send_audio(audio_filename) | |
play_audio('response.mp3') | |
else: | |
# Normal mode: record and send audio | |
while True: | |
audio_filename = 'temp.wav' | |
record_audio(audio_filename) | |
send_audio(audio_filename) | |
play_audio('response.mp3') | |
if input("Press Enter to continue or type 'exit' to quit: ").lower() == 'exit': | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment