Skip to content

Instantly share code, notes, and snippets.

@bigsnarfdude
Last active May 19, 2025 00:45
Show Gist options
  • Save bigsnarfdude/916994ebe132491826c223554f8e0dbd to your computer and use it in GitHub Desktop.
Save bigsnarfdude/916994ebe132491826c223554f8e0dbd to your computer and use it in GitHub Desktop.
transcribe.py
#!/usr/bin/env python3
"""
Video Processing Script
This script processes a list of video files from a CSV file:
1. Downloads the MP4 videos from URLs
2. Converts the videos to MP3 format
3. Generates transcripts from the MP3 files
4. Stores files in appropriate folders
5. Deletes the original MP4 files after processing
6. Logs all activities
Features:
- Robust resume capability if interrupted
- Progress tracking and state persistence
- Partial file download resumption
- Comprehensive error handling and logging
Requirements:
- Python 3.8+
- ffmpeg: for video-to-audio conversion
- whisper: for speech-to-text transcription
Install required packages with:
pip install requests tqdm pandas openai-whisper pydub logging
"""
import os
import sys
import csv
import time
import json
import requests
import logging
import subprocess
import pandas as pd
from tqdm import tqdm
from datetime import datetime
from pathlib import Path
import shutil
import hashlib
import pickle
# Removed whisper import since we're using Parakeet
# Create a state tracker class to manage resumption
class ProcessingState:
"""Class to track and persist processing state for resuming operations."""
def __init__(self, state_file_path="processing_state.pkl"):
self.state_file_path = state_file_path
self.completed_videos = set()
self.in_progress = {} # Maps video name to current processing stage
self.download_sizes = {} # Tracks partial downloads by size
self.load_state()
def load_state(self):
"""Load previous state from file if it exists."""
if os.path.exists(self.state_file_path):
try:
with open(self.state_file_path, 'rb') as f:
state = pickle.load(f)
self.completed_videos = state.get('completed_videos', set())
self.in_progress = state.get('in_progress', {})
self.download_sizes = state.get('download_sizes', {})
logging.info(f"Loaded previous state: {len(self.completed_videos)} completed, "
f"{len(self.in_progress)} in progress")
except Exception as e:
logging.error(f"Error loading state file: {e}")
# Create backup of corrupted state file
if os.path.exists(self.state_file_path):
backup_path = f"{self.state_file_path}.bak.{int(time.time())}"
shutil.copy(self.state_file_path, backup_path)
logging.info(f"Created backup of corrupted state file: {backup_path}")
def save_state(self):
"""Save current state to file."""
try:
state = {
'completed_videos': self.completed_videos,
'in_progress': self.in_progress,
'download_sizes': self.download_sizes
}
with open(self.state_file_path, 'wb') as f:
pickle.dump(state, f)
except Exception as e:
logging.error(f"Error saving state file: {e}")
def mark_completed(self, video_name):
"""Mark a video as completely processed."""
if video_name in self.in_progress:
del self.in_progress[video_name]
self.completed_videos.add(video_name)
self.save_state()
def mark_stage(self, video_name, stage):
"""Mark the current processing stage of a video."""
self.in_progress[video_name] = stage
self.save_state()
def update_download_size(self, video_name, size):
"""Update the current download size for partial download resumption."""
self.download_sizes[video_name] = size
self.save_state()
def get_download_size(self, video_name):
"""Get the current download size for a video."""
return self.download_sizes.get(video_name, 0)
def is_completed(self, video_name):
"""Check if a video has been completely processed."""
return video_name in self.completed_videos
def get_stage(self, video_name):
"""Get the current processing stage for a video."""
return self.in_progress.get(video_name, None)
# Set up logging
def setup_logging():
"""Configure logging to both file and console."""
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = log_dir / f"video_processing_{timestamp}.log"
# Create handlers
file_handler = logging.FileHandler(log_file)
console_handler = logging.StreamHandler(sys.stdout)
# Configure logging format
log_format = '%(asctime)s - %(levelname)s - %(message)s'
formatter = logging.Formatter(log_format)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Set up the root logger
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(file_handler)
root_logger.addHandler(console_handler)
return root_logger
# Add a signal handler to gracefully handle interruptions
def add_signal_handlers():
"""Add signal handlers for graceful interruption."""
import signal
def signal_handler(sig, frame):
logging.warning(f"Received signal {sig}, shutting down gracefully...")
logging.info("You can resume processing by running the script again")
sys.exit(0)
# Register signal handlers
signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
signal.signal(signal.SIGTERM, signal_handler) # Termination signal
# Function to create directory structure
def create_directories():
"""Create the necessary directory structure for the project."""
directories = ["downloads", "mp3", "transcripts", "logs"]
for directory in directories:
Path(directory).mkdir(exist_ok=True)
logging.info(f"Created directory: {directory}")
# Create a .gitignore file to prevent committing large files
gitignore_path = Path(".gitignore")
if not gitignore_path.exists():
with open(gitignore_path, "w") as f:
f.write("# Ignore large files and temporary files\n")
f.write("downloads/\n")
f.write("*.mp4\n")
f.write("*.part\n")
f.write("*.temp\n")
f.write("processing_state.pkl\n")
logging.info("Created .gitignore file")
# Function to download a file with progress bar and resume capability
def download_file(url, target_path, state, video_name):
"""Download a file from URL with progress reporting and resume capability."""
try:
logging.info(f"Starting download: {url}")
temp_path = f"{target_path}.part"
# Get the size of the temporary file if it exists
current_size = 0
if os.path.exists(temp_path):
current_size = os.path.getsize(temp_path)
# If there's a mismatch between our state and the actual file, trust the file
if current_size != state.get_download_size(video_name):
logging.warning(f"Download size mismatch for {video_name}. File: {current_size}, State: {state.get_download_size(video_name)}")
state.update_download_size(video_name, current_size)
# Set up headers for resume
headers = {}
if current_size > 0:
headers['Range'] = f'bytes={current_size}-'
logging.info(f"Resuming download from byte {current_size}")
# Make the request with the resume headers
response = requests.get(url, stream=True, headers=headers)
# If we tried to resume but the server doesn't support it
if current_size > 0 and response.status_code == 200:
logging.warning("Server doesn't support resume. Starting from beginning.")
current_size = 0
if os.path.exists(temp_path):
os.remove(temp_path)
# If we're resuming and the server acknowledged it
if current_size > 0 and response.status_code == 206:
logging.info("Server supports resume. Continuing download.")
response.raise_for_status()
# Get the total file size
if 'content-length' in response.headers:
file_size = int(response.headers.get('content-length', 0))
total_size = current_size + file_size
else:
total_size = 0 # Unknown size
# If content-range header is present, use it to determine total size
if 'content-range' in response.headers:
content_range = response.headers.get('content-range')
try:
total_size = int(content_range.split('/')[-1])
except (IndexError, ValueError):
pass
block_size = 1024 * 8 # 8 KB
# Initialize progress bar
progress_bar = tqdm(
total=total_size,
initial=current_size,
unit='iB',
unit_scale=True,
desc=f"Downloading {Path(target_path).name}"
)
# Open the file in append mode if resuming, otherwise in write mode
mode = 'ab' if current_size > 0 else 'wb'
with open(temp_path, mode) as file:
for data in response.iter_content(block_size):
file.write(data)
data_len = len(data)
progress_bar.update(data_len)
current_size += data_len
# Periodically update the state to track download progress
if current_size % (1024 * 1024) == 0: # Update every 1MB
state.update_download_size(video_name, current_size)
progress_bar.close()
# Verify download completed successfully
if total_size > 0 and current_size != total_size:
logging.warning(f"Downloaded file size {current_size} does not match expected size {total_size} for {url}")
state.update_download_size(video_name, current_size)
return False
# Download completed, rename temp file to final name
os.rename(temp_path, target_path)
# Clear the download size from state since download is complete
if video_name in state.download_sizes:
del state.download_sizes[video_name]
state.save_state()
logging.info(f"Download completed: {target_path}")
return True
except requests.exceptions.ConnectionError as e:
logging.error(f"Connection error for {url}: {str(e)}")
state.update_download_size(video_name, current_size if 'current_size' in locals() else 0)
return False
except Exception as e:
logging.error(f"Download failed for {url}: {str(e)}")
state.update_download_size(video_name, current_size if 'current_size' in locals() else 0)
return False
# Function to convert MP4 to MP3
def convert_to_mp3(mp4_path, mp3_path):
"""Convert an MP4 file to MP3 format using ffmpeg."""
try:
logging.info(f"Converting {mp4_path} to MP3")
# Command for conversion
command = [
"ffmpeg",
"-i", mp4_path, # Input file
"-q:a", "0", # Best audio quality
"-map", "a", # Extract only audio
"-y", # Overwrite output file if it exists
mp3_path # Output file
]
# Execute the command
process = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Check if conversion was successful
if process.returncode == 0:
logging.info(f"Conversion successful: {mp3_path}")
return True
else:
logging.error(f"Conversion failed: {process.stderr}")
return False
except Exception as e:
logging.error(f"Error converting file {mp4_path}: {str(e)}")
return False
# Function to verify file integrity
def verify_file_integrity(file_path):
"""Verify file integrity by checking if it can be opened and read."""
try:
# For MP4 files, try to get information using ffmpeg
if str(file_path).endswith('.mp4'):
command = ["ffmpeg", "-i", str(file_path), "-f", "null", "-"]
result = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
return result.returncode == 0
# For MP3 files, try to get duration
elif str(file_path).endswith('.mp3'):
command = ["ffmpeg", "-i", str(file_path), "-f", "null", "-"]
result = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
return result.returncode == 0
# For other files, just check if they can be opened
else:
with open(file_path, 'rb') as f:
f.seek(0, os.SEEK_END)
size = f.tell()
return size > 0
except Exception as e:
logging.error(f"File integrity check failed for {file_path}: {str(e)}")
return False
# Function to transcribe MP3 using NVIDIA Parakeet TDT
def transcribe_audio(mp3_path, transcript_path):
"""Transcribe an MP3 file using NVIDIA Parakeet TDT model and save the transcript."""
try:
logging.info(f"Transcribing {mp3_path} with NVIDIA Parakeet TDT model")
# Import required libraries
import nemo.collections.asr as nemo_asr
import os
# Load the NVIDIA Parakeet TDT model from Hugging Face
logging.info("Loading NVIDIA Parakeet TDT model...")
asr_model = nemo_asr.models.ASRModel.from_pretrained(model_name="nvidia/parakeet-tdt-0.6b-v2")
# Transcribe the audio
logging.info(f"Running transcription on {mp3_path}")
output = asr_model.transcribe([str(mp3_path)], timestamps=True)
# Extract the transcript text
transcript_text = output[0].text
# Get word-level timestamps for potential use
word_timestamps = output[0].timestamp['word']
# Write the transcript to a file
with open(transcript_path, "w", encoding="utf-8") as f:
f.write(transcript_text)
# Optionally, you can save timestamps in a structured format
f.write("\n\n--- Word Timestamps ---\n")
for word_info in word_timestamps:
start_time = word_info['start']
end_time = word_info['end']
word = word_info['word']
f.write(f"{start_time:.2f} - {end_time:.2f}: {word}\n")
logging.info(f"Transcription completed: {transcript_path}")
return True
except Exception as e:
logging.error(f"Error transcribing file {mp3_path}: {str(e)}")
return False
# Function to process a single video
def process_video(row, state):
"""Process a single video from download to transcript."""
# Extract information from the row
name = row['name']
url = row['url']
directory = row['directory']
# Skip files that are marked as "broken"
if "broken" in name.lower():
logging.warning(f"Skipping broken file: {name}")
return False
# Skip if already completed
if state.is_completed(name):
logging.info(f"Video {name} already fully processed, skipping...")
return True
# Define output paths
mp4_path = Path("downloads") / name
base_name = name.rsplit('.', 1)[0] # Remove extension
mp3_path = Path("mp3") / f"{base_name}.mp3"
transcript_path = Path("transcripts") / f"{base_name}.txt"
# Get the current processing stage, if any
current_stage = state.get_stage(name)
try:
# Step 1: Download the MP4 file (if not already done)
if current_stage is None or current_stage == "downloading":
state.mark_stage(name, "downloading")
if not mp4_path.exists() or not verify_file_integrity(mp4_path):
success = download_file(url, mp4_path, state, name)
if not success:
return False
else:
logging.info(f"MP4 file already exists and verified: {mp4_path}")
# Step 2: Convert to MP3 (if not already done)
if current_stage is None or current_stage in ["downloading", "converting"]:
state.mark_stage(name, "converting")
if not mp3_path.exists() or not verify_file_integrity(mp3_path):
success = convert_to_mp3(str(mp4_path), str(mp3_path))
if not success:
return False
else:
logging.info(f"MP3 file already exists and verified: {mp3_path}")
# Step 3: Generate transcript (if not already done)
if current_stage is None or current_stage in ["downloading", "converting", "transcribing"]:
state.mark_stage(name, "transcribing")
if not transcript_path.exists():
success = transcribe_audio(str(mp3_path), str(transcript_path))
if not success:
return False
else:
logging.info(f"Transcript already exists: {transcript_path}")
# Step 4: Clean up - remove the MP4 file
if mp4_path.exists():
try:
os.remove(mp4_path)
logging.info(f"Removed original MP4 file: {mp4_path}")
except Exception as e:
logging.warning(f"Failed to remove MP4 file {mp4_path}: {str(e)}")
# Mark as completed
state.mark_completed(name)
logging.info(f"Successfully processed video: {name}")
return True
except Exception as e:
logging.error(f"Error processing {name}: {str(e)}")
return False
# Main function
def main(csv_file):
"""Main function to process all videos in the CSV file."""
logger = setup_logging()
logger.info("Starting video processing pipeline")
# Add signal handlers for graceful interruption
add_signal_handlers()
# Create directory structure
create_directories()
# Initialize state tracker
state = ProcessingState()
# Read the CSV file
try:
# Check if we need to create the CSV first from the provided data
if not os.path.exists(csv_file) and os.path.exists("videos_data.txt"):
logger.info(f"CSV file {csv_file} not found, but found videos_data.txt, creating CSV...")
with open("videos_data.txt", "r") as f:
lines = f.read().strip().split("\n")
# Create CSV file from text data
with open(csv_file, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["name", "url", "last_modified", "size", "is_directory", "directory"])
for line in lines[1:]: # Skip header
if line:
writer.writerow(line.split(","))
logger.info(f"Created CSV file {csv_file}")
# Read and process the CSV file
df = pd.read_csv(csv_file)
total_videos = len(df)
logger.info(f"Found {total_videos} videos in CSV file")
# Save the list of videos to process for resumption
all_video_names = set(df['name'].tolist())
# Process each video
successful = 0
failed = 0
skipped = 0
for index, row in df.iterrows():
name = row['name']
logger.info(f"Processing video {index + 1}/{total_videos}: {name}")
# Skip if already completed
if state.is_completed(name):
logger.info(f"Video {name} already fully processed, skipping...")
skipped += 1
continue
# Process the video
if process_video(row, state):
successful += 1
else:
failed += 1
# Log summary
logger.info("Processing completed")
logger.info(f"Total videos: {total_videos}")
logger.info(f"Successfully processed: {successful}")
logger.info(f"Failed: {failed}")
logger.info(f"Skipped (already processed): {skipped}")
except KeyboardInterrupt:
logger.warning("Processing interrupted by user")
logger.info("You can resume processing by running the script again")
return False
except Exception as e:
logger.error(f"Error reading CSV file {csv_file}: {str(e)}")
return False
return True
if __name__ == "__main__":
if len(sys.argv) > 1:
csv_file = sys.argv[1]
else:
csv_file = "videos.csv" # Default CSV filename
main(csv_file)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment