Last active
May 19, 2025 00:45
-
-
Save bigsnarfdude/916994ebe132491826c223554f8e0dbd to your computer and use it in GitHub Desktop.
transcribe.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Video Processing Script | |
This script processes a list of video files from a CSV file: | |
1. Downloads the MP4 videos from URLs | |
2. Converts the videos to MP3 format | |
3. Generates transcripts from the MP3 files | |
4. Stores files in appropriate folders | |
5. Deletes the original MP4 files after processing | |
6. Logs all activities | |
Features: | |
- Robust resume capability if interrupted | |
- Progress tracking and state persistence | |
- Partial file download resumption | |
- Comprehensive error handling and logging | |
Requirements: | |
- Python 3.8+ | |
- ffmpeg: for video-to-audio conversion | |
- whisper: for speech-to-text transcription | |
Install required packages with: | |
pip install requests tqdm pandas openai-whisper pydub logging | |
""" | |
import os | |
import sys | |
import csv | |
import time | |
import json | |
import requests | |
import logging | |
import subprocess | |
import pandas as pd | |
from tqdm import tqdm | |
from datetime import datetime | |
from pathlib import Path | |
import shutil | |
import hashlib | |
import pickle | |
# Removed whisper import since we're using Parakeet | |
# Create a state tracker class to manage resumption | |
class ProcessingState: | |
"""Class to track and persist processing state for resuming operations.""" | |
def __init__(self, state_file_path="processing_state.pkl"): | |
self.state_file_path = state_file_path | |
self.completed_videos = set() | |
self.in_progress = {} # Maps video name to current processing stage | |
self.download_sizes = {} # Tracks partial downloads by size | |
self.load_state() | |
def load_state(self): | |
"""Load previous state from file if it exists.""" | |
if os.path.exists(self.state_file_path): | |
try: | |
with open(self.state_file_path, 'rb') as f: | |
state = pickle.load(f) | |
self.completed_videos = state.get('completed_videos', set()) | |
self.in_progress = state.get('in_progress', {}) | |
self.download_sizes = state.get('download_sizes', {}) | |
logging.info(f"Loaded previous state: {len(self.completed_videos)} completed, " | |
f"{len(self.in_progress)} in progress") | |
except Exception as e: | |
logging.error(f"Error loading state file: {e}") | |
# Create backup of corrupted state file | |
if os.path.exists(self.state_file_path): | |
backup_path = f"{self.state_file_path}.bak.{int(time.time())}" | |
shutil.copy(self.state_file_path, backup_path) | |
logging.info(f"Created backup of corrupted state file: {backup_path}") | |
def save_state(self): | |
"""Save current state to file.""" | |
try: | |
state = { | |
'completed_videos': self.completed_videos, | |
'in_progress': self.in_progress, | |
'download_sizes': self.download_sizes | |
} | |
with open(self.state_file_path, 'wb') as f: | |
pickle.dump(state, f) | |
except Exception as e: | |
logging.error(f"Error saving state file: {e}") | |
def mark_completed(self, video_name): | |
"""Mark a video as completely processed.""" | |
if video_name in self.in_progress: | |
del self.in_progress[video_name] | |
self.completed_videos.add(video_name) | |
self.save_state() | |
def mark_stage(self, video_name, stage): | |
"""Mark the current processing stage of a video.""" | |
self.in_progress[video_name] = stage | |
self.save_state() | |
def update_download_size(self, video_name, size): | |
"""Update the current download size for partial download resumption.""" | |
self.download_sizes[video_name] = size | |
self.save_state() | |
def get_download_size(self, video_name): | |
"""Get the current download size for a video.""" | |
return self.download_sizes.get(video_name, 0) | |
def is_completed(self, video_name): | |
"""Check if a video has been completely processed.""" | |
return video_name in self.completed_videos | |
def get_stage(self, video_name): | |
"""Get the current processing stage for a video.""" | |
return self.in_progress.get(video_name, None) | |
# Set up logging | |
def setup_logging(): | |
"""Configure logging to both file and console.""" | |
log_dir = Path("logs") | |
log_dir.mkdir(exist_ok=True) | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
log_file = log_dir / f"video_processing_{timestamp}.log" | |
# Create handlers | |
file_handler = logging.FileHandler(log_file) | |
console_handler = logging.StreamHandler(sys.stdout) | |
# Configure logging format | |
log_format = '%(asctime)s - %(levelname)s - %(message)s' | |
formatter = logging.Formatter(log_format) | |
file_handler.setFormatter(formatter) | |
console_handler.setFormatter(formatter) | |
# Set up the root logger | |
root_logger = logging.getLogger() | |
root_logger.setLevel(logging.INFO) | |
root_logger.addHandler(file_handler) | |
root_logger.addHandler(console_handler) | |
return root_logger | |
# Add a signal handler to gracefully handle interruptions | |
def add_signal_handlers(): | |
"""Add signal handlers for graceful interruption.""" | |
import signal | |
def signal_handler(sig, frame): | |
logging.warning(f"Received signal {sig}, shutting down gracefully...") | |
logging.info("You can resume processing by running the script again") | |
sys.exit(0) | |
# Register signal handlers | |
signal.signal(signal.SIGINT, signal_handler) # Ctrl+C | |
signal.signal(signal.SIGTERM, signal_handler) # Termination signal | |
# Function to create directory structure | |
def create_directories(): | |
"""Create the necessary directory structure for the project.""" | |
directories = ["downloads", "mp3", "transcripts", "logs"] | |
for directory in directories: | |
Path(directory).mkdir(exist_ok=True) | |
logging.info(f"Created directory: {directory}") | |
# Create a .gitignore file to prevent committing large files | |
gitignore_path = Path(".gitignore") | |
if not gitignore_path.exists(): | |
with open(gitignore_path, "w") as f: | |
f.write("# Ignore large files and temporary files\n") | |
f.write("downloads/\n") | |
f.write("*.mp4\n") | |
f.write("*.part\n") | |
f.write("*.temp\n") | |
f.write("processing_state.pkl\n") | |
logging.info("Created .gitignore file") | |
# Function to download a file with progress bar and resume capability | |
def download_file(url, target_path, state, video_name): | |
"""Download a file from URL with progress reporting and resume capability.""" | |
try: | |
logging.info(f"Starting download: {url}") | |
temp_path = f"{target_path}.part" | |
# Get the size of the temporary file if it exists | |
current_size = 0 | |
if os.path.exists(temp_path): | |
current_size = os.path.getsize(temp_path) | |
# If there's a mismatch between our state and the actual file, trust the file | |
if current_size != state.get_download_size(video_name): | |
logging.warning(f"Download size mismatch for {video_name}. File: {current_size}, State: {state.get_download_size(video_name)}") | |
state.update_download_size(video_name, current_size) | |
# Set up headers for resume | |
headers = {} | |
if current_size > 0: | |
headers['Range'] = f'bytes={current_size}-' | |
logging.info(f"Resuming download from byte {current_size}") | |
# Make the request with the resume headers | |
response = requests.get(url, stream=True, headers=headers) | |
# If we tried to resume but the server doesn't support it | |
if current_size > 0 and response.status_code == 200: | |
logging.warning("Server doesn't support resume. Starting from beginning.") | |
current_size = 0 | |
if os.path.exists(temp_path): | |
os.remove(temp_path) | |
# If we're resuming and the server acknowledged it | |
if current_size > 0 and response.status_code == 206: | |
logging.info("Server supports resume. Continuing download.") | |
response.raise_for_status() | |
# Get the total file size | |
if 'content-length' in response.headers: | |
file_size = int(response.headers.get('content-length', 0)) | |
total_size = current_size + file_size | |
else: | |
total_size = 0 # Unknown size | |
# If content-range header is present, use it to determine total size | |
if 'content-range' in response.headers: | |
content_range = response.headers.get('content-range') | |
try: | |
total_size = int(content_range.split('/')[-1]) | |
except (IndexError, ValueError): | |
pass | |
block_size = 1024 * 8 # 8 KB | |
# Initialize progress bar | |
progress_bar = tqdm( | |
total=total_size, | |
initial=current_size, | |
unit='iB', | |
unit_scale=True, | |
desc=f"Downloading {Path(target_path).name}" | |
) | |
# Open the file in append mode if resuming, otherwise in write mode | |
mode = 'ab' if current_size > 0 else 'wb' | |
with open(temp_path, mode) as file: | |
for data in response.iter_content(block_size): | |
file.write(data) | |
data_len = len(data) | |
progress_bar.update(data_len) | |
current_size += data_len | |
# Periodically update the state to track download progress | |
if current_size % (1024 * 1024) == 0: # Update every 1MB | |
state.update_download_size(video_name, current_size) | |
progress_bar.close() | |
# Verify download completed successfully | |
if total_size > 0 and current_size != total_size: | |
logging.warning(f"Downloaded file size {current_size} does not match expected size {total_size} for {url}") | |
state.update_download_size(video_name, current_size) | |
return False | |
# Download completed, rename temp file to final name | |
os.rename(temp_path, target_path) | |
# Clear the download size from state since download is complete | |
if video_name in state.download_sizes: | |
del state.download_sizes[video_name] | |
state.save_state() | |
logging.info(f"Download completed: {target_path}") | |
return True | |
except requests.exceptions.ConnectionError as e: | |
logging.error(f"Connection error for {url}: {str(e)}") | |
state.update_download_size(video_name, current_size if 'current_size' in locals() else 0) | |
return False | |
except Exception as e: | |
logging.error(f"Download failed for {url}: {str(e)}") | |
state.update_download_size(video_name, current_size if 'current_size' in locals() else 0) | |
return False | |
# Function to convert MP4 to MP3 | |
def convert_to_mp3(mp4_path, mp3_path): | |
"""Convert an MP4 file to MP3 format using ffmpeg.""" | |
try: | |
logging.info(f"Converting {mp4_path} to MP3") | |
# Command for conversion | |
command = [ | |
"ffmpeg", | |
"-i", mp4_path, # Input file | |
"-q:a", "0", # Best audio quality | |
"-map", "a", # Extract only audio | |
"-y", # Overwrite output file if it exists | |
mp3_path # Output file | |
] | |
# Execute the command | |
process = subprocess.run( | |
command, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True | |
) | |
# Check if conversion was successful | |
if process.returncode == 0: | |
logging.info(f"Conversion successful: {mp3_path}") | |
return True | |
else: | |
logging.error(f"Conversion failed: {process.stderr}") | |
return False | |
except Exception as e: | |
logging.error(f"Error converting file {mp4_path}: {str(e)}") | |
return False | |
# Function to verify file integrity | |
def verify_file_integrity(file_path): | |
"""Verify file integrity by checking if it can be opened and read.""" | |
try: | |
# For MP4 files, try to get information using ffmpeg | |
if str(file_path).endswith('.mp4'): | |
command = ["ffmpeg", "-i", str(file_path), "-f", "null", "-"] | |
result = subprocess.run( | |
command, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True | |
) | |
return result.returncode == 0 | |
# For MP3 files, try to get duration | |
elif str(file_path).endswith('.mp3'): | |
command = ["ffmpeg", "-i", str(file_path), "-f", "null", "-"] | |
result = subprocess.run( | |
command, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True | |
) | |
return result.returncode == 0 | |
# For other files, just check if they can be opened | |
else: | |
with open(file_path, 'rb') as f: | |
f.seek(0, os.SEEK_END) | |
size = f.tell() | |
return size > 0 | |
except Exception as e: | |
logging.error(f"File integrity check failed for {file_path}: {str(e)}") | |
return False | |
# Function to transcribe MP3 using NVIDIA Parakeet TDT | |
def transcribe_audio(mp3_path, transcript_path): | |
"""Transcribe an MP3 file using NVIDIA Parakeet TDT model and save the transcript.""" | |
try: | |
logging.info(f"Transcribing {mp3_path} with NVIDIA Parakeet TDT model") | |
# Import required libraries | |
import nemo.collections.asr as nemo_asr | |
import os | |
# Load the NVIDIA Parakeet TDT model from Hugging Face | |
logging.info("Loading NVIDIA Parakeet TDT model...") | |
asr_model = nemo_asr.models.ASRModel.from_pretrained(model_name="nvidia/parakeet-tdt-0.6b-v2") | |
# Transcribe the audio | |
logging.info(f"Running transcription on {mp3_path}") | |
output = asr_model.transcribe([str(mp3_path)], timestamps=True) | |
# Extract the transcript text | |
transcript_text = output[0].text | |
# Get word-level timestamps for potential use | |
word_timestamps = output[0].timestamp['word'] | |
# Write the transcript to a file | |
with open(transcript_path, "w", encoding="utf-8") as f: | |
f.write(transcript_text) | |
# Optionally, you can save timestamps in a structured format | |
f.write("\n\n--- Word Timestamps ---\n") | |
for word_info in word_timestamps: | |
start_time = word_info['start'] | |
end_time = word_info['end'] | |
word = word_info['word'] | |
f.write(f"{start_time:.2f} - {end_time:.2f}: {word}\n") | |
logging.info(f"Transcription completed: {transcript_path}") | |
return True | |
except Exception as e: | |
logging.error(f"Error transcribing file {mp3_path}: {str(e)}") | |
return False | |
# Function to process a single video | |
def process_video(row, state): | |
"""Process a single video from download to transcript.""" | |
# Extract information from the row | |
name = row['name'] | |
url = row['url'] | |
directory = row['directory'] | |
# Skip files that are marked as "broken" | |
if "broken" in name.lower(): | |
logging.warning(f"Skipping broken file: {name}") | |
return False | |
# Skip if already completed | |
if state.is_completed(name): | |
logging.info(f"Video {name} already fully processed, skipping...") | |
return True | |
# Define output paths | |
mp4_path = Path("downloads") / name | |
base_name = name.rsplit('.', 1)[0] # Remove extension | |
mp3_path = Path("mp3") / f"{base_name}.mp3" | |
transcript_path = Path("transcripts") / f"{base_name}.txt" | |
# Get the current processing stage, if any | |
current_stage = state.get_stage(name) | |
try: | |
# Step 1: Download the MP4 file (if not already done) | |
if current_stage is None or current_stage == "downloading": | |
state.mark_stage(name, "downloading") | |
if not mp4_path.exists() or not verify_file_integrity(mp4_path): | |
success = download_file(url, mp4_path, state, name) | |
if not success: | |
return False | |
else: | |
logging.info(f"MP4 file already exists and verified: {mp4_path}") | |
# Step 2: Convert to MP3 (if not already done) | |
if current_stage is None or current_stage in ["downloading", "converting"]: | |
state.mark_stage(name, "converting") | |
if not mp3_path.exists() or not verify_file_integrity(mp3_path): | |
success = convert_to_mp3(str(mp4_path), str(mp3_path)) | |
if not success: | |
return False | |
else: | |
logging.info(f"MP3 file already exists and verified: {mp3_path}") | |
# Step 3: Generate transcript (if not already done) | |
if current_stage is None or current_stage in ["downloading", "converting", "transcribing"]: | |
state.mark_stage(name, "transcribing") | |
if not transcript_path.exists(): | |
success = transcribe_audio(str(mp3_path), str(transcript_path)) | |
if not success: | |
return False | |
else: | |
logging.info(f"Transcript already exists: {transcript_path}") | |
# Step 4: Clean up - remove the MP4 file | |
if mp4_path.exists(): | |
try: | |
os.remove(mp4_path) | |
logging.info(f"Removed original MP4 file: {mp4_path}") | |
except Exception as e: | |
logging.warning(f"Failed to remove MP4 file {mp4_path}: {str(e)}") | |
# Mark as completed | |
state.mark_completed(name) | |
logging.info(f"Successfully processed video: {name}") | |
return True | |
except Exception as e: | |
logging.error(f"Error processing {name}: {str(e)}") | |
return False | |
# Main function | |
def main(csv_file): | |
"""Main function to process all videos in the CSV file.""" | |
logger = setup_logging() | |
logger.info("Starting video processing pipeline") | |
# Add signal handlers for graceful interruption | |
add_signal_handlers() | |
# Create directory structure | |
create_directories() | |
# Initialize state tracker | |
state = ProcessingState() | |
# Read the CSV file | |
try: | |
# Check if we need to create the CSV first from the provided data | |
if not os.path.exists(csv_file) and os.path.exists("videos_data.txt"): | |
logger.info(f"CSV file {csv_file} not found, but found videos_data.txt, creating CSV...") | |
with open("videos_data.txt", "r") as f: | |
lines = f.read().strip().split("\n") | |
# Create CSV file from text data | |
with open(csv_file, "w", newline="") as f: | |
writer = csv.writer(f) | |
writer.writerow(["name", "url", "last_modified", "size", "is_directory", "directory"]) | |
for line in lines[1:]: # Skip header | |
if line: | |
writer.writerow(line.split(",")) | |
logger.info(f"Created CSV file {csv_file}") | |
# Read and process the CSV file | |
df = pd.read_csv(csv_file) | |
total_videos = len(df) | |
logger.info(f"Found {total_videos} videos in CSV file") | |
# Save the list of videos to process for resumption | |
all_video_names = set(df['name'].tolist()) | |
# Process each video | |
successful = 0 | |
failed = 0 | |
skipped = 0 | |
for index, row in df.iterrows(): | |
name = row['name'] | |
logger.info(f"Processing video {index + 1}/{total_videos}: {name}") | |
# Skip if already completed | |
if state.is_completed(name): | |
logger.info(f"Video {name} already fully processed, skipping...") | |
skipped += 1 | |
continue | |
# Process the video | |
if process_video(row, state): | |
successful += 1 | |
else: | |
failed += 1 | |
# Log summary | |
logger.info("Processing completed") | |
logger.info(f"Total videos: {total_videos}") | |
logger.info(f"Successfully processed: {successful}") | |
logger.info(f"Failed: {failed}") | |
logger.info(f"Skipped (already processed): {skipped}") | |
except KeyboardInterrupt: | |
logger.warning("Processing interrupted by user") | |
logger.info("You can resume processing by running the script again") | |
return False | |
except Exception as e: | |
logger.error(f"Error reading CSV file {csv_file}: {str(e)}") | |
return False | |
return True | |
if __name__ == "__main__": | |
if len(sys.argv) > 1: | |
csv_file = sys.argv[1] | |
else: | |
csv_file = "videos.csv" # Default CSV filename | |
main(csv_file) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment