Last active
November 20, 2023 17:43
-
-
Save sanjarcode/5ee62e10ad9aabe26ae04f1f119efaaa to your computer and use it in GitHub Desktop.
Get transcripts of tree of .mp4 videos
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## Get englisgh transcripts of a tree of .mp4 files | |
## Cost incurred: 200 videos of ~3 minutes each => around ~$5 | |
## Other params: ~200 API calls; total ~40,000 seconds | |
## Cost estimate as per pricing chart: 683 min * 0.006 = ~4.098 | |
## Ignores non mp4 files | |
## Adds .mp3 files for each file, and then calls Whisper API and stores the .txt | |
## Nothing is deleted | |
## Resumable | |
## Skips .mp3 generation it exists | |
## Skips transcript call if .txt exists | |
## `export OPENAI_API_KEY="sk-...."` added in .zshrc file | |
import os | |
import sys | |
from openai import OpenAI | |
client = OpenAI() | |
def getTranscript(filePath): | |
with open(filePath, "rb") as audio_file: | |
transcript = client.audio.transcriptions.create( | |
model="whisper-1", | |
file=audio_file, | |
response_format="text" # commentable | |
) | |
return transcript | |
## function to traverse over all .mp4 files in the folder and subfolders recursively | |
## each_file (file_path, folder_path) | |
def traverse_mp4_files(folder_path, do_each_file, sortLikeFileExplorer=False, debug=False): | |
mp4_files = [] | |
for root, dirs, files in os.walk(folder_path): | |
for file in files: | |
if file.endswith(".mp4"): | |
mp4_files.append(os.path.join(root, file)) | |
if debug: | |
print(file) | |
else: | |
if do_each_file is not None: | |
do_each_file(os.path.join(root, file), folder_path) | |
if sortLikeFileExplorer: | |
return sort_file_paths(mp4_files) | |
else: | |
return mp4_files | |
def getMP4Transcript(filePath): | |
with open(filePath, "rb") as video_file: | |
filePathWithoutMp4 = filePath.replace(".mp4", "") | |
mp3Name=f"{filePathWithoutMp4}-temp.mp3" | |
transcript_file_path=f"{filePathWithoutMp4}.txt" | |
# generate mp3 | |
mp3exists = 0 == os.system(f'ls "{mp3Name}"') | |
if not mp3exists: | |
print('Generating mp3 for', filePath) | |
os.system(f"ffmpeg -i '{filePath}' '{mp3Name}'") | |
else: | |
print('Skipping mp3 gen for', filePath) | |
# get and generate transcript | |
txtExists = 0 == os.system(f'ls "{transcript_file_path}"') | |
if not txtExists: | |
print('Calling transc API for', filePath) | |
transcript = getTranscript(mp3Name) | |
transcript_file_path=f"{filePathWithoutMp4}.txt" | |
with open(transcript_file_path, "w") as transcript_file: | |
transcript_file.write(transcript) | |
return transcript | |
else: | |
print('Skipping transc API call for', filePath) | |
with open(transcript_file_path, "r") as transcript_file: | |
transcript = transcript_file.read() | |
return transcript | |
## remove mp3 | |
# os.system(f"rm '{mp3Name}'") | |
positive = 0 | |
total = 0 | |
negative = [] | |
totalmp3 = 207 | |
done = 0 | |
def writeLongStringToFile(title, content, filePath = './done.md'): | |
try: | |
transcript = content | |
with open(filePath, "a") as storageFile: | |
# method 1 | |
human_sentences = split_sentences(transcript) | |
human_transcript = '\n'.join([s.strip() for s in human_sentences if s.strip()]) | |
transcript = f""" | |
File: `{title}` | |
Transcript: | |
```md | |
{human_transcript} | |
``` | |
--- | |
""" | |
storageFile.write(transcript) | |
except Exception as e: | |
print(e) | |
def getAndSaveTranscriptMP4(videoFilePath, rootPath, safeFilePath = './done.md'): | |
try: | |
transcript = getMP4Transcript(videoFilePath) | |
writeLongStringToFile(videoFilePath, transcript, safeFilePath) | |
global done | |
global totalmp3 | |
print('Done', f'{done} / {totalmp3}', videoFilePath) | |
done+=1 | |
except Exception as e: | |
print(e) | |
## utils | |
## sort array of file paths in a way a file explorer would alphabetically | |
def sort_file_paths(file_paths): | |
def custom_sort(path): | |
directory, filename = os.path.split(path) | |
return (directory.lower(), filename.lower()) | |
return sorted(file_paths, key=custom_sort) | |
## Given a very large paragraph (as string), divides it into max 80 chars lines | |
## takes care to not break words between | |
def split_sentences(sentence): | |
if len(sentence) <= 80: | |
return [sentence] | |
sentences = [] | |
current_sentence = "" | |
words = sentence.split() | |
for word in words: | |
if len(current_sentence) + len(word) + 1 <= 80: # 1 for the space | |
if current_sentence: | |
current_sentence += " " | |
current_sentence += word | |
else: | |
sentences.append(current_sentence) | |
current_sentence = word | |
if current_sentence: | |
sentences.append(current_sentence) | |
return sentences | |
if __name__ == "__main__": | |
rootPath = sys.argv[1] | |
# Work on the node | |
# traverse_mp4_files(rootPath, getAndSaveTranscriptMP4) | |
# Run after top has been done, to collate transcripts in alphabetical (file) order | |
sorted_files = traverse_mp4_files(rootPath, None, True) | |
for file in sorted_files: | |
getAndSaveTranscriptMP4(file, rootPath) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## Get english transcripts of a tree of .mp4 files | |
## Ignores non mp4 files in the tree | |
## Adds .mp3 files for each file, using `ffmpeg -i`, and then calls Whisper API and stores the .txt | |
## Nothing is deleted | |
## Resumable | |
## Skips .mp3 generation if exists | |
## Skips transcript call if .txt exists | |
## `export OPENAI_API_KEY="sk-...."` added in .zshrc file | |
## Usage: `python video2Text.py path-to-folder` | |
## Example: `python video2Text.py ~/my-files/large-store/` | |
## Cost incurred: 200 videos of ~3 minutes each => around ~$5 | |
## Other params: ~200 API calls; total ~40,000 seconds | |
## Cost estimate as per pricing chart: 683 min * 0.006 = ~4.098 | |
## Time taken ~30 minutes | |
## Conclusion: too expensive for India, and non-dollar locales | |
## Workaround: try with local model | |
import os | |
from openai import OpenAI | |
client = OpenAI() | |
def getTranscript(filePath): | |
with open(filePath, "rb") as audio_file: | |
transcript = client.audio.transcriptions.create( | |
model="whisper-1", | |
file=audio_file, | |
response_format="text" # commentable | |
) | |
return transcript | |
## function to traverse over all .mp4 files in the folder and subfolders recursively | |
## each_file (file_path, folder_path) | |
def traverse_mp4_files(folder_path, each_file, debug=False): | |
mp4_files = [] | |
for root, dirs, files in os.walk(folder_path): | |
for file in files: | |
if file.endswith(".mp4"): | |
mp4_files.append(os.path.join(root, file)) | |
if debug: | |
print(file) | |
else: | |
each_file(os.path.join(root, file), folder_path) | |
return mp4_files | |
def getMP4Transcript(filePath, root): | |
with open(filePath, "rb") as video_file: | |
filePathWithoutMp4 = filePath.replace(".mp4", "") | |
mp3Name=f"{filePathWithoutMp4}-temp.mp3" | |
transcript_file_path=f"{filePathWithoutMp4}.txt" | |
# generate mp3 | |
mp3exists = 0 == os.system(f'ls "{mp3Name}"') | |
if not mp3exists: | |
print('Generating mp3 for', filePath) | |
os.system(f"ffmpeg -i '{filePath}' '{mp3Name}'") | |
else: | |
print('Skipping mp3 gen for', filePath) | |
# get and generate transcript | |
txtExists = 0 == os.system(f'ls "{transcript_file_path}"') | |
if not txtExists: | |
print('Calling transc API for', filePath) | |
transcript = getTranscript(mp3Name) | |
transcript_file_path=f"{filePathWithoutMp4}.txt" | |
with open(transcript_file_path, "w") as transcript_file: | |
transcript_file.write(transcript) | |
return transcript | |
else: | |
print('Skipping transc API call for', filePath) | |
with open(transcript_file_path, "r") as transcript_file: | |
transcript = transcript_file.read() | |
return transcript | |
## remove mp3 | |
# os.system(f"rm '{mp3Name}'") | |
positive = 0 | |
total = 0 | |
negative = [] | |
totalmp3 = 207 | |
done = 0 | |
def f(*args): | |
filePath, root = args | |
try: | |
transcript = getMP4Transcript(filePath, root) | |
# safety stash, just in case | |
with open('./api-done.txt', "a") as transcript_file: | |
transcript_file.write(transcript) | |
transcript_file.write('------') | |
transcript_file.write('------') | |
transcript_file.write('------') | |
global done | |
global totalmp3 | |
print('Done', f'{done} / {totalmp3}', filePath) | |
done+=1 | |
except Exception as e: | |
print(e) | |
import sys | |
traverse_mp4_files(sys.argv[1], f) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment