Last active
September 13, 2024 02:13
-
-
Save ericboehs/d6bcb3e10ee2267b1a7a5623f46e1f89 to your computer and use it in GitHub Desktop.
Generates a video for my daughters to use to study for their vocab tests. Use OpenAI for TTS.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'json' | |
require 'fileutils' | |
require 'net/http' | |
require 'uri' | |
require 'digest' | |
OPENAI_API_KEY = ENV['OPENAI_API_KEY'] | |
if OPENAI_API_KEY.nil? || OPENAI_API_KEY.empty? | |
puts "Error: OPENAI_API_KEY environment variable is not set." | |
exit 1 | |
end | |
# Function to display help text and exit | |
def show_help_and_exit | |
puts "Usage: ruby #{File.basename(__FILE__)} <input_json_file>" | |
puts "Example: ruby #{File.basename(__FILE__)} vocabulary.json" | |
puts "\nThis script creates 1080p vocabulary videos (light and dark versions) with voice narration from a JSON file containing words and definitions." | |
exit 1 | |
end | |
# Check if an argument is provided | |
if ARGV.empty? | |
puts "Error: No input file specified." | |
show_help_and_exit | |
end | |
# Get the filename from the first command-line argument | |
input_file = ARGV[0] | |
# Check if the file exists | |
unless File.exist?(input_file) | |
puts "Error: File '#{input_file}' not found." | |
show_help_and_exit | |
end | |
# Read and parse the JSON file | |
begin | |
vocabulary = JSON.parse(File.read(input_file)) | |
rescue JSON::ParserError | |
puts "Error: Invalid JSON format in '#{input_file}'." | |
exit 1 | |
rescue => e | |
puts "Error reading file: #{e.message}" | |
exit 1 | |
end | |
# Create temporary directories | |
FileUtils.mkdir_p('frames_light') | |
FileUtils.mkdir_p('frames_dark') | |
FileUtils.mkdir_p('audio') | |
FileUtils.mkdir_p('audio_cache') | |
FileUtils.mkdir_p('segments_light') | |
FileUtils.mkdir_p('segments_dark') | |
# Function to create a frame with text wrapping and margin | |
def create_frame(text, filename, font_size = 80, dark = false) | |
escaped_text = text.gsub('"', '\"').gsub("'", "\'") # Escape quotes in the text | |
width = 1920 | |
height = 1080 | |
margin = (width * 0.1).to_i # 10% margin | |
inner_width = width - (2 * margin) | |
inner_height = height - (2 * margin) | |
bg_color = dark ? 'black' : 'white' | |
text_color = dark ? '#E0E0E0' : 'black' # Light gray for dark mode | |
command = "magick -size #{inner_width}x#{inner_height} xc:none -font Arial -pointsize #{font_size} " \ | |
"-gravity center -background none -fill \"#{text_color}\" " \ | |
"caption:\"#{escaped_text}\" " \ | |
"-background #{bg_color} -extent #{width}x#{height} -gravity center " \ | |
"-composite #{filename}" | |
system(command) | |
unless File.exist?(filename) | |
puts "Error: Failed to create frame #{filename}" | |
puts "Command used: #{command}" | |
exit 1 | |
end | |
end | |
# Function to generate speech audio using OpenAI API with retry mechanism | |
def generate_speech(text, filename) | |
cache_filename = "audio_cache/#{Digest::MD5.hexdigest(text)}.mp3" | |
if File.exist?(cache_filename) | |
puts "Using cached audio for: #{text}" | |
FileUtils.cp(cache_filename, filename) | |
return | |
end | |
puts "Generating audio for: #{text}" | |
uri = URI.parse("https://api.openai.com/v1/audio/speech") | |
request = Net::HTTP::Post.new(uri) | |
request["Authorization"] = "Bearer #{OPENAI_API_KEY}" | |
request["Content-Type"] = "application/json" | |
request.body = JSON.dump({ | |
"model" => "tts-1-hd", | |
"input" => text, | |
"voice" => "nova" | |
}) | |
retry_count = 0 | |
max_retries = 25 | |
loop do | |
response = Net::HTTP.start(uri.hostname, uri.port, use_ssl: true) do |http| | |
http.request(request) | |
end | |
if response.code == "200" | |
File.open(filename, "wb") do |file| | |
file.write(response.body) | |
end | |
FileUtils.cp(filename, cache_filename) | |
break | |
else | |
error_response = JSON.parse(response.body) | |
if error_response['error'] && error_response['error']['code'] == 'rate_limit_exceeded' | |
wait_time = error_response['error']['message'].scan(/try again in (\d+)s/).flatten.first.to_i | |
puts "Rate limit reached, retrying in #{wait_time} seconds..." | |
sleep(wait_time) | |
retry_count += 1 | |
if retry_count > max_retries | |
puts "Max retries reached. Exiting..." | |
exit 1 | |
end | |
else | |
puts "Error generating speech: #{response.body}" | |
exit 1 | |
end | |
end | |
end | |
end | |
# Function to get audio duration | |
def get_audio_duration(filename) | |
output = `ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 #{filename}` | |
output.to_f | |
end | |
# Function to create silent audio | |
def create_silent_audio(duration, filename) | |
system("ffmpeg -f lavfi -i anullsrc=r=44100:cl=mono -t #{duration} -ar 44100 -ac 2 #{filename}") | |
end | |
# Function to expand part of speech abbreviation | |
def expand_part_of_speech(text) | |
expansions = { | |
'n.' => 'noun.', | |
'v.' => 'verb.', | |
'adj.' => 'adjective.', | |
'adv.' => 'adverb.', | |
'prep.' => 'preposition.', | |
'conj.' => 'conjunction.', | |
'pron.' => 'pronoun.', | |
'interj.' => 'interjection.' | |
} | |
expansions.each do |abbr, full| | |
text = text.gsub(/\b#{Regexp.escape(abbr)}(?=\s|\z)/, full) | |
end | |
text | |
end | |
segment_list_light = [] | |
segment_list_dark = [] | |
# Create initial white and black frames and silent audio | |
create_frame("", "frames_light/initial_white.png", 80, false) | |
create_frame("", "frames_dark/initial_black.png", 80, true) | |
create_silent_audio(1, "audio/initial_silence.mp3") | |
system("ffmpeg -loop 1 -i frames_light/initial_white.png -i audio/initial_silence.mp3 -c:v libx264 -t 1 -pix_fmt yuv420p -vf scale=1920:1080 -ar 44100 -ac 2 segments_light/initial.mp4") | |
system("ffmpeg -loop 1 -i frames_dark/initial_black.png -i audio/initial_silence.mp3 -c:v libx264 -t 1 -pix_fmt yuv420p -vf scale=1920:1080 -ar 44100 -ac 2 segments_dark/initial.mp4") | |
segment_list_light << "segments_light/initial.mp4" | |
segment_list_dark << "segments_dark/initial.mp4" | |
# Create video segments for each word and definition | |
vocabulary.each_with_index do |item, index| | |
word = item['word'] | |
definition = item['definition'] | |
# Word segments | |
word_frame_light = "frames_light/word_#{index}.png" | |
word_frame_dark = "frames_dark/word_#{index}.png" | |
create_frame(word, word_frame_light, 200, false) | |
create_frame(word, word_frame_dark, 200, true) | |
word_audio = "audio/word_#{index}.mp3" | |
# Expand part of speech for word audio | |
word_parts = word.split(/\s+(?=[a-z]+\.)/, 2) | |
base_word = word_parts[0] | |
pos = word_parts[1] || '' | |
expanded_pos = expand_part_of_speech(pos) | |
expanded_word = "#{base_word} #{expanded_pos}".strip | |
puts "Expanded word: #{expanded_word}" # Debug output | |
generate_speech(expanded_word, word_audio) | |
word_audio_duration = get_audio_duration(word_audio) | |
# Create silent audio to pad the word segment to 5 seconds | |
silence_duration = 5 - word_audio_duration | |
silence_audio = "audio/silence_#{index}.mp3" | |
create_silent_audio(silence_duration, silence_audio) | |
# Combine word audio and silence | |
combined_audio = "audio/combined_#{index}.mp3" | |
system("ffmpeg -i #{word_audio} -i #{silence_audio} -filter_complex '[0:a][1:a]concat=n=2:v=0:a=1[out]' -map '[out]' #{combined_audio}") | |
# Create word segments (5 seconds long) | |
word_segment_light = "segments_light/word_#{index}.mp4" | |
word_segment_dark = "segments_dark/word_#{index}.mp4" | |
system("ffmpeg -loop 1 -i #{word_frame_light} -i #{combined_audio} -c:v libx264 -c:a aac -ar 44100 -pix_fmt yuv420p -shortest #{word_segment_light}") | |
system("ffmpeg -loop 1 -i #{word_frame_dark} -i #{combined_audio} -c:v libx264 -c:a aac -ar 44100 -pix_fmt yuv420p -shortest #{word_segment_dark}") | |
segment_list_light << word_segment_light | |
segment_list_dark << word_segment_dark | |
# Definition segments | |
definition_frame_light = "frames_light/def_#{index}.png" | |
definition_frame_dark = "frames_dark/def_#{index}.png" | |
create_frame(definition, definition_frame_light, 80, false) | |
create_frame(definition, definition_frame_dark, 80, true) | |
definition_audio = "audio/def_#{index}.mp3" | |
expanded_definition = expand_part_of_speech(definition) | |
# Replace semicolons with periods for TTS | |
tts_definition = expanded_definition.gsub(';', '.') | |
puts "Expanded definition: #{tts_definition}" # Debug output | |
generate_speech(tts_definition, definition_audio) | |
definition_audio_duration = get_audio_duration(definition_audio) | |
# Create definition segments with 1-second silence before speech | |
definition_segment_light = "segments_light/def_#{index}.mp4" | |
definition_segment_dark = "segments_dark/def_#{index}.mp4" | |
system("ffmpeg -loop 1 -i #{definition_frame_light} -i #{definition_audio} -filter_complex \"[1:a]adelay=1000|1000[a]\" -map 0:v -map \"[a]\" -c:v libx264 -c:a aac -ar 44100 -t #{definition_audio_duration + 3} -pix_fmt yuv420p -vf scale=1920:1080 #{definition_segment_light}") | |
system("ffmpeg -loop 1 -i #{definition_frame_dark} -i #{definition_audio} -filter_complex \"[1:a]adelay=1000|1000[a]\" -map 0:v -map \"[a]\" -c:v libx264 -c:a aac -ar 44100 -t #{definition_audio_duration + 3} -pix_fmt yuv420p -vf scale=1920:1080 #{definition_segment_dark}") | |
segment_list_light << definition_segment_light | |
segment_list_dark << definition_segment_dark | |
end | |
# Create file lists for segment concatenation | |
File.open('segment_list_light.txt', 'w') do |file| | |
segment_list_light.each do |segment| | |
file.puts "file '#{segment}'" | |
end | |
end | |
File.open('segment_list_dark.txt', 'w') do |file| | |
segment_list_dark.each do |segment| | |
file.puts "file '#{segment}'" | |
end | |
end | |
# Generate output filenames based on input filename | |
output_file_light = File.basename(input_file, File.extname(input_file)) + '_video_light.mp4' | |
output_file_dark = File.basename(input_file, File.extname(input_file)) + '_video_dark.mp4' | |
# Function to concatenate segments using filter_complex | |
def concatenate_segments(segment_list, output_file) | |
filter_complex = segment_list.each_with_index.map { |_, i| "[#{i}:v][#{i}:a]" }.join | |
filter_complex += "concat=n=#{segment_list.size}:v=1:a=1[outv][outa]" | |
# Create the FFmpeg command | |
concat_command = "ffmpeg " | |
segment_list.each_with_index do |segment, i| | |
concat_command += "-i #{segment} " | |
end | |
concat_command += "-filter_complex \"#{filter_complex}\" " | |
concat_command += "-map \"[outv]\" -map \"[outa]\" " | |
concat_command += "-c:v libx264 -c:a aac -ar 44100 -pix_fmt yuv420p " | |
concat_command += "#{output_file}" | |
# Execute the FFmpeg command | |
puts "Executing FFmpeg command for #{output_file}:" | |
puts concat_command | |
system(concat_command) | |
puts "Video created: #{output_file}" | |
end | |
# Concatenate all segments for light version | |
concatenate_segments(segment_list_light, output_file_light) | |
# Concatenate all segments for dark version | |
concatenate_segments(segment_list_dark, output_file_dark) | |
puts "Light version video created: #{output_file_light}" | |
puts "Dark version video created: #{output_file_dark}" | |
puts "Individual segments and audio files have been kept for debugging purposes." | |
puts "Please check the 'segments_light', 'segments_dark', and 'audio' directories for individual files." | |
# Clean up temporary files (optional) | |
# FileUtils.rm_rf('frames_light') | |
# FileUtils.rm_rf('frames_dark') | |
# FileUtils.rm_rf('audio') | |
# FileUtils.rm_rf('segments_light') | |
# FileUtils.rm_rf('segments_dark') | |
# File.delete('segment_list_light.txt') | |
# File.delete('segment_list_dark.txt') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment