|
import os |
|
import moviepy.editor as mp |
|
import ffmpeg |
|
from openai import OpenAI |
|
from pydub import AudioSegment |
|
|
|
# Set OpenAI API Key |
|
key = "sk-" |
|
client = OpenAI(api_key=key) |
|
|
|
def extract_audio_from_video(input_video, output_audio): |
|
video = mp.VideoFileClip(input_video) |
|
video.audio.write_audiofile(output_audio) |
|
print(f"Audio extracted: {os.path.exists(output_audio)}") |
|
return video.duration |
|
|
|
def transcribe_and_translate_audio(audio_file, src_lang="hi", target_lang="en"): |
|
MAX_CHUNK_SIZE = 24 * 1024 * 1024 # 24 MB |
|
audio = AudioSegment.from_wav(audio_file) |
|
duration_ms = len(audio) |
|
chunk_duration_ms = (MAX_CHUNK_SIZE / len(audio.raw_data)) * duration_ms |
|
|
|
translated_text = "" |
|
for i, chunk_start in enumerate(range(0, duration_ms, int(chunk_duration_ms))): |
|
chunk_end = min(chunk_start + int(chunk_duration_ms), duration_ms) |
|
chunk = audio[chunk_start:chunk_end] |
|
|
|
chunk_file = f"temp_chunk_{i}.wav" |
|
chunk.export(chunk_file, format="wav") |
|
|
|
print(f"Processing chunk {i+1}, size: {os.path.getsize(chunk_file)} bytes") |
|
|
|
with open(chunk_file, "rb") as audio_chunk: |
|
response = client.audio.translations.create( |
|
model="whisper-1", |
|
file=audio_chunk, |
|
response_format="text" |
|
) |
|
translated_text += response + " " |
|
|
|
os.remove(chunk_file) |
|
|
|
return translated_text.strip() |
|
|
|
def create_timed_subtitles(translated_text, video_duration): |
|
words = translated_text.split() |
|
total_words = len(words) |
|
words_per_second = total_words / video_duration |
|
|
|
subtitles = [] |
|
current_subtitle = "" |
|
word_count = 0 |
|
start_time = 0 |
|
|
|
for word in words: |
|
current_subtitle += word + " " |
|
word_count += 1 |
|
|
|
if word_count >= words_per_second * 3 or word.endswith('.'): # Create a new subtitle every 3 seconds or at the end of a sentence |
|
end_time = start_time + (word_count / words_per_second) |
|
subtitles.append((start_time, end_time, current_subtitle.strip())) |
|
current_subtitle = "" |
|
word_count = 0 |
|
start_time = end_time |
|
|
|
return subtitles |
|
|
|
def format_time(seconds): |
|
hours = int(seconds // 3600) |
|
minutes = int((seconds % 3600) // 60) |
|
seconds = seconds % 60 |
|
return f"{hours:02d}:{minutes:02d}:{seconds:06.3f}" |
|
|
|
def create_srt_content(subtitles): |
|
srt_content = "" |
|
for i, (start, end, text) in enumerate(subtitles, 1): |
|
srt_content += f"{i}\n" |
|
srt_content += f"{format_time(start)} --> {format_time(end)}\n" |
|
srt_content += f"{text}\n\n" |
|
return srt_content |
|
|
|
def generate_tts_audio(translated_text, output_audio_file): |
|
MAX_CHARS = 4000 # Slightly less than 4096 to be safe |
|
chunks = [translated_text[i:i+MAX_CHARS] for i in range(0, len(translated_text), MAX_CHARS)] |
|
|
|
all_audio = [] |
|
for i, chunk in enumerate(chunks): |
|
print(f"Processing TTS chunk {i+1}/{len(chunks)}") |
|
response = client.audio.speech.create( |
|
model="tts-1", |
|
voice="alloy", |
|
input=chunk |
|
) |
|
chunk_file = f"temp_chunk_{i}.mp3" |
|
response.stream_to_file(chunk_file) |
|
all_audio.append(AudioSegment.from_mp3(chunk_file)) |
|
os.remove(chunk_file) |
|
|
|
combined_audio = sum(all_audio) |
|
combined_audio.export(output_audio_file, format="mp3") |
|
print(f"TTS audio generated: {output_audio_file}") |
|
|
|
def add_subtitles_and_replace_audio(input_video, srt_content, new_audio_file, output_video): |
|
srt_file = "temp_subtitles.srt" |
|
with open(srt_file, "w", encoding="utf-8") as f: |
|
f.write(srt_content) |
|
|
|
try: |
|
video = ffmpeg.input(input_video) |
|
audio = ffmpeg.input(new_audio_file) |
|
|
|
( |
|
ffmpeg |
|
.concat( |
|
video.filter('subtitles', srt_file), |
|
audio, |
|
v=1, |
|
a=1 |
|
) |
|
.output(output_video) |
|
.overwrite_output() |
|
.run(capture_stdout=True, capture_stderr=True) |
|
) |
|
print("\nSubtitles added and audio replaced successfully.") |
|
print(f"Output video created: {os.path.exists(output_video)}") |
|
except ffmpeg.Error as e: |
|
print(f"\nFFmpeg Error: {e.stderr.decode()}") |
|
finally: |
|
os.remove(srt_file) |
|
|
|
def process_video(input_video, output_video, src_lang="hi", target_lang="en"): |
|
audio_file = "temp_audio.wav" |
|
tts_audio_file = "temp_tts_audio.mp3" |
|
|
|
print(f"Extracting audio from video: {input_video}") |
|
video_duration = extract_audio_from_video(input_video, audio_file) |
|
|
|
print("Transcribing and translating audio...") |
|
translated_text = transcribe_and_translate_audio(audio_file, src_lang, target_lang) |
|
|
|
print("Creating timed subtitles...") |
|
timed_subtitles = create_timed_subtitles(translated_text, video_duration) |
|
|
|
print("Creating SRT content...") |
|
srt_content = create_srt_content(timed_subtitles) |
|
|
|
print("Generating TTS audio...") |
|
generate_tts_audio(translated_text, tts_audio_file) |
|
|
|
print("Adding subtitles and replacing audio in video...") |
|
add_subtitles_and_replace_audio(input_video, srt_content, tts_audio_file, output_video) |
|
|
|
# Clean up temporary files |
|
os.remove(audio_file) |
|
os.remove(tts_audio_file) |
|
|
|
if os.path.exists(output_video): |
|
print(f"Process completed! Output saved to {output_video}") |
|
else: |
|
print(f"Error: Output file not created at {output_video}") |
|
|
|
if __name__ == "__main__": |
|
input_video = "cheap_drone_hindi.mp4" |
|
output_video = "cheap_drone_hindi_tts.mp4" |
|
process_video(input_video, output_video, src_lang="hi", target_lang="en") |