Last active
December 21, 2020 05:21
-
-
Save zorbaproject/d0ce6078399ea875819f738858fb2f96 to your computer and use it in GitHub Desktop.
Transcribe long audio files with Google Speech API
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Original source: https://towardsdatascience.com/how-to-use-google-speech-to-text-api-to-transcribe-long-audio-files-1c886f4eb3e9 | |
#Requirements | |
#sudo pip3 install google-cloud-storage | |
#sudo pip3 install google-cloud-speech | |
#sudo pip3 install pydub | |
#sudo apt install ffmpeg | |
#Create a project on https://console.cloud.google.com/ | |
#Enable Storage Transfer API | |
#https://console.cloud.google.com/storage/ | |
#Enable Speech to Text API | |
#https://console.cloud.google.com/apis/library/speech.googleapis.com | |
#Create a key for a service account and store it in the same folder as this script, with name 'service_account.json' | |
#https://console.cloud.google.com/iam-admin/serviceaccounts | |
from pydub import AudioSegment | |
import io | |
import os | |
import os.path | |
from google.cloud import speech | |
#from google.cloud.speech import enums | |
#from google.cloud.speech import types | |
#https://github.com/googleapis/python-speech/blob/master/UPGRADING.md#enums-and-types | |
import wave | |
from google.cloud import storage | |
import sys | |
import json | |
if len(sys.argv)>1: | |
filepath = sys.argv[1] | |
else: | |
print("Usage: ./transcribe.py audiofile.mp3 language") | |
print("E.g.: ./transcribe.py audio/ it-IT") | |
print("Input con be a single mp3 file, a single wav file, or a folder of files.") | |
sys.exit() | |
output_filepath = filepath #"./Transcripts/" | |
if len(sys.argv)>2: | |
lang = sys.argv[2] | |
else: | |
lang = 'it-IT' #'en-US' | |
accountfile = "service_account.json" | |
text_file = open(accountfile, "r", encoding='utf-8') | |
lines = text_file.read() | |
text_file.close() | |
accountdata = json.loads(lines.replace("\n", "").replace("\r", "")) | |
bucket_name = accountdata["project_id"] + "-audiofiles" | |
def mp3_to_wav(audio_file_name): | |
if audio_file_name.split('.')[-1] == 'mp3': | |
waudio_file_name = audio_file_name.split('.')[-2] + '.wav' | |
if not os.path.isfile(waudio_file_name): | |
sound = AudioSegment.from_mp3(audio_file_name) | |
print("MP3 to WAV: "+audio_file_name) | |
sound.export(waudio_file_name, format="wav") | |
def frame_rate_channel(audio_file_name): | |
with wave.open(audio_file_name, "rb") as wave_file: | |
frame_rate = wave_file.getframerate() | |
channels = wave_file.getnchannels() | |
return frame_rate,channels | |
def stereo_to_mono(audio_file_name): | |
sound = AudioSegment.from_wav(audio_file_name) | |
sound = sound.set_channels(1) | |
sound.export(audio_file_name, format="wav") | |
def upload_blob(bucket_name, source_file_name, destination_blob_name): | |
"""Uploads a file to the bucket.""" | |
global accountfile | |
storage_client = storage.Client.from_service_account_json(accountfile) #storage.Client() | |
try: | |
bucket = storage_client.get_bucket(bucket_name) | |
except: | |
bucket = storage_client.create_bucket(bucket_name) | |
bucket = storage_client.get_bucket(bucket_name) | |
blob = bucket.blob(destination_blob_name) | |
print("Upload to bucket "+bucket_name) | |
blob.upload_from_filename(source_file_name) | |
def delete_blob(bucket_name, blob_name): | |
"""Deletes a blob from the bucket.""" | |
global accountfile | |
storage_client = storage.Client.from_service_account_json(accountfile) #storage.Client() | |
bucket = storage_client.get_bucket(bucket_name) | |
blob = bucket.blob(blob_name) | |
print("Delete from bucket") | |
blob.delete() | |
def google_transcribe(file_name): | |
global lang | |
global bucket_name | |
global accountfile | |
#if ".mp3" in file_name: | |
if file_name.split('.')[-1] == 'mp3': | |
mp3_to_wav(file_name) | |
file_name = file_name.split('.')[-2] + '.wav' | |
#file_name = file_name.replace(".mp3", ".wav") | |
print("Working on "+file_name) | |
# The name of the audio file to transcribe | |
frame_rate, channels = frame_rate_channel(file_name) | |
if channels > 1: | |
stereo_to_mono(file_name) | |
audio_file_name = os.path.basename(file_name) | |
source_file_name = file_name | |
destination_blob_name = audio_file_name | |
upload_blob(bucket_name, source_file_name, destination_blob_name) | |
gcs_uri = 'gs://'+bucket_name+'/' + audio_file_name | |
transcript = '' | |
client = speech.SpeechClient.from_service_account_json(accountfile) #speech.SpeechClient() | |
audio = speech.RecognitionAudio(uri=gcs_uri) | |
config = speech.RecognitionConfig( | |
encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16, | |
sample_rate_hertz=frame_rate, | |
enable_automatic_punctuation=True, | |
language_code=lang) | |
# Detects speech in the audio file | |
print("Transcribing the audio file, this is going to take some time...") | |
operation = client.long_running_recognize(config=config, audio=audio) | |
response = operation.result(timeout=10000) | |
for result in response.results: | |
transcript += result.alternatives[0].transcript | |
delete_blob(bucket_name, destination_blob_name) | |
return transcript | |
def write_transcripts(transcript_filename,transcript): | |
f= open(transcript_filename,"w+") | |
f.write(transcript) | |
f.close() | |
print("Wrote text on "+transcript_filename) | |
if __name__ == "__main__": | |
if os.path.isdir(filepath): | |
for audio_file_name in os.listdir(filepath): | |
if ".wav" in audio_file_name or ".mp3" in audio_file_name: | |
file_name = filepath + "/" + audio_file_name | |
file_name = file_name.replace("//", "/") | |
transcript = google_transcribe(file_name) | |
transcript_filename = audio_file_name.replace('.wav', '.txt') | |
transcript_filename = audio_file_name.replace('.mp3', '.txt') | |
write_transcripts(transcript_filename,transcript) | |
else: | |
audio_file_name = filepath | |
transcript = google_transcribe(audio_file_name) | |
transcript_filename = audio_file_name.replace('.wav', '.txt') | |
transcript_filename = audio_file_name.replace('.mp3', '.txt') | |
try: | |
write_transcripts(transcript_filename,transcript) | |
except: | |
write_transcripts("output-transcribe.txt",transcript) | |
#Source file: Public conference with ambient noise | |
#Source duration: 1 hour 50 minutes | |
#Source format: mp3 | |
#Total time: 35 minutes | |
#Language: it-IT | |
#Source file: Public conference with ambient noise | |
#Source duration: 1 hour 50 minutes | |
#Source format: wav | |
#Total time: 20 minutes | |
#Language: it-IT | |
#Source file: Italian Premier official speech on March 11th 2020 | |
#Source duration: 9 minutes | |
#Source format: mp3 | |
#Total time: 3 minutes | |
#Language: it-IT | |
#Estimated costs: 5,36 euros for 325,75 minutes of audio |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment