|
import os |
|
import soundfile as sf |
|
from kokoro import KPipeline |
|
from datetime import datetime |
|
import subprocess |
|
|
|
def read_transcript(file_path): |
|
"""Read text from transcript file.""" |
|
try: |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
return f.read() |
|
except FileNotFoundError: |
|
print(f"Error: Could not find {file_path}") |
|
raise |
|
except Exception as e: |
|
print(f"Error reading file: {e}") |
|
raise |
|
|
|
def generate_speech(text, output_dir): |
|
"""Generate speech segments from text.""" |
|
# Initialize pipeline |
|
pipeline = KPipeline(lang_code='a') # 'a' for American English |
|
|
|
# Generate audio |
|
generator = pipeline( |
|
text, |
|
voice='af_bella', |
|
speed=0.9, |
|
split_pattern=r'\n+' |
|
) |
|
|
|
# Process and save audio files |
|
for i, (gs, ps, audio) in enumerate(generator): |
|
print(f"\nProcessing segment {i}") |
|
print(f"Text: {gs}") # graphemes/text |
|
print(f"Phonemes: {ps}") |
|
|
|
# Save audio file |
|
output_file = os.path.join(output_dir, f'sentence_{i:03d}.wav') |
|
sf.write(output_file, audio, 24000) |
|
|
|
# Save text content |
|
text_file = os.path.join(output_dir, f'sentence_{i:03d}.txt') |
|
with open(text_file, 'w', encoding='utf-8') as f: |
|
f.write(gs) |
|
|
|
print(f"Saved audio file: {output_file}") |
|
print(f"Saved text file: {text_file}") |
|
|
|
def merge_audio_files(output_dir): |
|
"""Merge all audio segments using ffmpeg.""" |
|
# Create a file list for ffmpeg |
|
file_list_path = os.path.join(output_dir, 'files.txt') |
|
|
|
# Get all wav files and sort them |
|
wav_files = sorted([f for f in os.listdir(output_dir) |
|
if f.startswith('sentence_') and f.endswith('.wav')]) |
|
|
|
# Create the files.txt for ffmpeg |
|
with open(file_list_path, 'w') as f: |
|
for wav_file in wav_files: |
|
f.write(f"file '{wav_file}'\n") |
|
|
|
merged_output = f'merged.wav' |
|
|
|
# Merge all files using ffmpeg |
|
ffmpeg_cmd = [ |
|
'ffmpeg', |
|
'-f', 'concat', |
|
'-safe', '0', |
|
'-i', 'files.txt', |
|
'-c', 'copy', |
|
merged_output |
|
] |
|
|
|
print("\nMerging audio files...") |
|
subprocess.run(ffmpeg_cmd, cwd=output_dir) |
|
print(f"Merged audio saved as: {output_dir}/{merged_output}") |
|
|
|
def main(): |
|
# Create output directory with timestamp |
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
output_dir = f'output_{timestamp}' |
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
# Get script directory and transcript path |
|
script_dir = os.path.dirname(os.path.abspath(__file__)) |
|
transcript_path = os.path.join(script_dir, 'transcript.txt') |
|
|
|
try: |
|
# Read transcript |
|
print(f"Reading transcript from {transcript_path}...") |
|
text = read_transcript(transcript_path) |
|
|
|
# Generate speech segments |
|
print(f"\nGenerating audio files in {output_dir}...") |
|
generate_speech(text, output_dir) |
|
|
|
# Merge audio files |
|
merge_audio_files(output_dir) |
|
|
|
except Exception as e: |
|
print(f"An error occurred: {str(e)}") |
|
return 1 |
|
|
|
return 0 |
|
|
|
if __name__ == "__main__": |
|
exit(main()) |