|
#!/usr/bin/env python3 |
|
# Copyright 2017 The Abseil Authors. |
|
# |
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
|
# you may not use this file except in compliance with the License. |
|
# You may obtain a copy of the License at |
|
# |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
|
# |
|
# Unless required by applicable law or agreed to in writing, software |
|
# distributed under the License is distributed on an "AS IS" BASIS, |
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
# See the License for the specific language governing permissions and |
|
# limitations under the License. |
|
""" |
|
Author: jimmymkude@ / bookman@ |
|
|
|
This script takes in Cloud VideoIntelligence API Speech Transcription response and converts to WebVTT. |
|
|
|
Modify is_break_point for max words / min words / max time per subtitle logic. |
|
|
|
Before running install google-cloud-videointelligence API: |
|
`pip3 install google-cloud-videointelligence` |
|
|
|
Auth is done through a service account. Which needs Video Intelligence & GCS permissions. |
|
|
|
Usage: |
|
python3 captioner.py --video=<gcs url> --service_account=service_account.json --out=subtitle.vtt |
|
|
|
Example: |
|
./captioner.py \ |
|
--video=gs://us-central1-cdn-test-files/static-mp4/stay_tuned.mp4 \ |
|
--service_account=service_account.json \ |
|
--out=stay_tuned.vtt |
|
|
|
""" |
|
import argparse |
|
import json |
|
import math |
|
from google.protobuf.json_format import MessageToJson |
|
from google.cloud import videointelligence |
|
|
|
|
|
|
|
def main(): |
|
args = arg_parse() |
|
|
|
# Client is not thread safe. |
|
# However, one should re-use client within the |
|
# same thread, and avoid creating a new client |
|
# per user request. |
|
video_client = (videointelligence |
|
.VideoIntelligenceServiceClient |
|
.from_service_account_file(args.service_account)) |
|
|
|
alternatives = transcribe(video_client, args.video) |
|
captions = break_down_transcriptions(alternatives) |
|
vtt = gen_vtt(captions) |
|
print(vtt) |
|
|
|
f = open(args.out, 'w') |
|
f.write(vtt) |
|
f.close() |
|
|
|
def arg_parse(): |
|
"""Parses args from cli.""" |
|
parser = argparse.ArgumentParser(description='WebVTT Caption Video in GCS.') |
|
|
|
parser.add_argument( |
|
'--video', type=str, required=True, |
|
help='gcs path to video, e.g: gs://mybucket/myvideo.mp4') |
|
|
|
parser.add_argument( |
|
'--service_account', type=str, required=True, |
|
help='path to service account, e.g: service_account.json') |
|
|
|
parser.add_argument( |
|
'--out', type=str, required=True, |
|
help='where to write vtt subtitles, e.g: subtitles.vtt') |
|
|
|
return parser.parse_args() |
|
|
|
def transcribe(video_client, gcs_video_path, language='en-US'): |
|
"""Transcribes video file in gcs.""" |
|
features = [videointelligence.enums.Feature.SPEECH_TRANSCRIPTION] |
|
|
|
config = videointelligence.types.SpeechTranscriptionConfig( |
|
language_code='en-US', |
|
enable_automatic_punctuation=True) |
|
video_context = videointelligence.types.VideoContext( |
|
speech_transcription_config=config) |
|
|
|
operation = video_client.annotate_video( |
|
gcs_video_path, features=features, |
|
video_context=video_context) |
|
|
|
result = operation.result(timeout=600) |
|
|
|
# There is only one annotation_result since only |
|
# one video is processed. |
|
return (result |
|
.annotation_results[0] |
|
.speech_transcriptions[0] |
|
.alternatives) |
|
|
|
def duration_to_seconds(duration): |
|
"""Converts Protobuf Duration into second float.""" |
|
return duration.seconds + (duration.nanos * 1e-9) |
|
|
|
def is_punctuation(char): |
|
if len(char) != 1: |
|
return False |
|
punctuations = {'.', ',', '!', '?', ':', ';'} |
|
return char in punctuations |
|
|
|
|
|
def is_break_point(word, diff, numberOfWordsInSentence): |
|
max_time = 3 # seconds |
|
min_num_of_words = 2 |
|
max_num_of_words = 14 |
|
last_char = word[(len(word) - 1)] |
|
|
|
# Max time elapsed |
|
if diff >= max_time: |
|
return True |
|
|
|
# End of sentance, and line length longer than min allowed |
|
if is_punctuation(last_char) and numberOfWordsInSentence >= min_num_of_words: |
|
return True |
|
|
|
# More than max words per line |
|
if numberOfWordsInSentence >= max_num_of_words: |
|
return True |
|
return False |
|
|
|
def seconds_to_timestring(elapsed): |
|
seconds = elapsed % 60 |
|
|
|
elapsed -= seconds |
|
|
|
hours = math.floor(elapsed / 60 / 60) |
|
elapsed -= hours * 60 * 60 |
|
|
|
minutes = math.floor(elapsed / 60) |
|
|
|
return '{}:{}:{:.3f}'.format(hours, minutes, seconds) |
|
|
|
|
|
def gen_vtt(captions): |
|
out = 'WEBVTT\n\n' |
|
|
|
for caption in captions: |
|
start = seconds_to_timestring(caption['videoSegment']['startTimeOffset']) |
|
end = seconds_to_timestring(caption['videoSegment']['endTimeOffset']) |
|
out += '{} --> {}\n'.format(start, end) |
|
out += caption['transcript'] + '\n' |
|
out += '\n' |
|
return out |
|
|
|
|
|
def break_down_transcriptions(alternatives): |
|
captions = [] |
|
for alternative in alternatives: |
|
start_time = duration_to_seconds(alternative.words[0].start_time) |
|
transcript = '' |
|
num_words_in_segment = 0 |
|
for word in alternative.words: |
|
if not transcript: |
|
start_time = duration_to_seconds(word.start_time) |
|
end_time = duration_to_seconds(word.end_time) |
|
diff = end_time - start_time |
|
transcript += word.word + ' ' |
|
num_words_in_segment += 1 |
|
if is_break_point(word.word, diff, num_words_in_segment): |
|
# found break point. |
|
video_segment = { |
|
'startTimeOffset': start_time, |
|
'endTimeOffset': end_time |
|
} |
|
|
|
# Gets rid of unnecessary white space |
|
# at the end of a caption. |
|
if transcript[-1] == ' ': |
|
transcript = transcript[:-1] |
|
caption = { |
|
'transcript': transcript, |
|
'confidence': alternative.confidence, |
|
'videoSegment': video_segment, |
|
} |
|
captions.append(caption) |
|
transcript = '' |
|
num_words_in_segment = 0 |
|
return captions |
|
|
|
|
|
if __name__ == '__main__': |
|
main() |