Skip to content

Instantly share code, notes, and snippets.

@yongjun823
Last active April 21, 2018 07:58
Show Gist options
  • Select an option

  • Save yongjun823/15bdb28db940fadcd8adb46d2549ccce to your computer and use it in GitHub Desktop.

Select an option

Save yongjun823/15bdb28db940fadcd8adb46d2549ccce to your computer and use it in GitHub Desktop.
1) Input as video input and receive as flac result 2) Upload flac file to cloud storege 3) Extract text by voice recognition 4) Save video name & text to fire store
from __future__ import print_function # for compatibility with both python 2 and 3
from subprocess import call # for calling mplayer and lame
import os # help with file handling
def check_file_exists(directory, filename, extension):
path = directory + "/" + filename + extension
return os.path.isfile(path)
def main(indir, outdir):
try:
# check specified folders exist
if not os.path.exists(indir):
exit("Error: Input directory \'" + indir + "\' does not exist. (try prepending './')")
if not os.path.exists(outdir):
exit("Error: Output directory \'" + outdir + "\' does not exist.")
if not os.access(outdir, os.W_OK):
exit("Error: Output directory \'" + outdir + "\' is not writeable.")
print("[{0}/*.mp4] --> [{1}/*.mp3]".format(indir, outdir))
files = [] # files for exporting
# get a list of all convertible files in the input directory
filelist = [f for f in os.listdir(indir) if f.endswith(".mp4")]
for path in filelist:
basename = os.path.basename(path)
filename = os.path.splitext(basename)[0]
files.append(filename)
# remove files that have already been outputted from the list
files[:] = [f for f in files if not check_file_exists(outdir, f, ".mp3")]
except OSError as e:
exit(e)
if len(files) == 0:
exit("Could not find any files to convert that have not already been converted.")
# convert all unconverted files
for filename in files:
print("-- converting {0}/{2}.mp4 to {1}/{2}.mp3 --".format(indir, outdir, filename))
call(["mplayer", "-novideo", "-nocorrect-pts", "-ao", "pcm:waveheader", indir + "/" + filename + ".mp4"])
call(["lame", "-v", "audiodump.wav", outdir + "/" + filename + ".mp3"])
call(['ffmpeg', '-i', '{}/{}.mp3'.format(outdir, filename),
'-ac', '1', '{}/{}.flac'.format(outdir, filename)])
call(['rm', '-rf', '{}/{}.mp3'.format(outdir, filename)])
# ToDO google storage upload!!
os.remove("audiodump.wav")
# set the default directories and try to get input directories
args = ["./video", "./output"]
main(args[0], args[1])
from google.cloud import speech
from google.cloud.speech import enums
from google.cloud.speech import types
from google.cloud import storage
import firebase_admin
from firebase_admin import credentials
from firebase_admin import firestore
import os
# Instantiates a client
cred = credentials.ApplicationDefault()
firebase_admin.initialize_app(cred, {
'projectId': '{project-id}',
})
db = firestore.client()
client = speech.SpeechClient()
storage_client = storage.Client()
bucket = storage_client.get_bucket('{bucket-name}')
output_dir = 'output'
for item in os.listdir(output_dir):
print('{} file stroage upload!'.format(item))
blob = bucket.blob('output/' + item)
blob.upload_from_filename(os.path.join(output_dir, item))
config = types.RecognitionConfig(
encoding=enums.RecognitionConfig.AudioEncoding.FLAC,
language_code='ko-KR',
sample_rate_hertz=44100,
)
bucket_url = 'gs://{bucket-url}'
# file_names = ['AT-cm_224754895-480.flac', 'AT-cm_226859162-480.flac']
file_names = os.listdir('output')
doc_ref = db.collection(u'label')
for name in file_names:
audio = types.RecognitionAudio(
uri=bucket_url + name,
)
response = client.long_running_recognize(
config=config, audio=audio
)
op_result = response.result()
for result in op_result.results:
for alternative in result.alternatives:
print(alternative.transcript)
print(alternative.confidence)
doc_ref.document().set({
u'subscription': str.encode(alternative.transcript),
u'video_name': str.encode(name)
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment