Skip to content

Instantly share code, notes, and snippets.

@joshbedo
Created June 16, 2025 16:38
Show Gist options
  • Save joshbedo/26e0ab25b1eba4fe8d16eaa7b2ef95da to your computer and use it in GitHub Desktop.
Save joshbedo/26e0ab25b1eba4fe8d16eaa7b2ef95da to your computer and use it in GitHub Desktop.
main.py
import enum
import glob
import json
import pathlib
import pickle
import shutil
import subprocess
import time
import uuid
import boto3
import cv2
import numpy as np
from pandas import cut
import pysubs2
from tqdm import tqdm
import whisperx
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
import modal
from pydantic import BaseModel
import os
import ffmpegcv
from google import genai
from openai import OpenAI
class ProcessVideoRequest(BaseModel):
s3_key: str
image = (modal.Image.from_registry(
"nvidia/cuda:12.4.0-devel-ubuntu22.04", add_python="3.12")
.apt_install(["ffmpeg", "libgl1-mesa-glx", "wget", "libcudnn8", "libcudnn8-dev"])
.pip_install_from_requirements("requirements.txt")
.run_commands(["mkdir -p /usr/share/fonts/truetype/custom",
"wget -O /usr/share/fonts/truetype/custom/Anton-Regular.ttf https://github.com/google/fonts/raw/main/ofl/anton/Anton-Regular.ttf",
"fc-cache -f -v"])
.add_local_dir("asd", "/asd", copy=True))
app = modal.App("cliptastic", image=image, secrets=[modal.Secret.from_name("cliptastic-secret")])
volume = modal.Volume.from_name(
"cliptastic-model-cache", create_if_missing=True
)
mount_path = "/root/.cache/torch"
auth_scheme = HTTPBearer()
def create_vertical_video(tracks, scores, pyframes_path, pyavi_path, audio_path, output_path, framerate=25):
target_width = 1080
target_height = 1920
flist = glob.glob(os.path.join(pyframes_path, "*.jpg"))
flist.sort()
faces = [[] for _ in range(len(flist))]
for tidx, track in enumerate(tracks):
score_array = scores[tidx]
for fidx, frame in enumerate(track["track"]["frame"].tolist()):
slice_start = max(fidx - 30, 0)
slice_end = min(fidx + 30, len(score_array))
score_slice = score_array[slice_start:slice_end]
avg_score = float(np.mean(score_slice) if len(score_slice) > 0 else 0)
faces[frame].append({"track": tidx, "score": avg_score, "s": track["proc_track"]["s"][fidx], "x": track["proc_track"]["x"][fidx], "y": track["proc_track"]["y"][fidx]})
temp_video_path = os.path.join(pyavi_path, "video_only.mp4")
vout = None
for fidx, fname in tqdm(enumerate(flist), total=len(flist), desc="Creating vertical video"):
img = cv2.imread(fname)
if img is None:
continue
current_faces = faces[fidx]
max_score_face = max(current_faces, key=lambda face: face['score']) if current_faces else None
if max_score_face and max_score_face['score'] < 0:
max_score_face = None
if vout is None:
vout = ffmpegcv.VideoWriterNV(
file=temp_video_path,
codec=None,
fps=framerate,
resize=(target_width, target_height)
)
if max_score_face:
mode = "crop"
else:
mode = "resize"
if mode == "resize":
scale = target_width / img.shape[1]
resized_height = int(img.shape[0] * scale)
resized_image = cv2.resize(img, (target_width, resized_height), interpolation=cv2.INTER_AREA)
scale_for_bg = max(
target_width / img.shape[1], target_height / img.shape[0])
bg_width = int(img.shape[1] * scale_for_bg)
bg_height = int(img.shape[0] * scale_for_bg)
blurred_background = cv2.resize(img, (bg_width, bg_height))
blurred_background = cv2.GaussianBlur(blurred_background, (121, 121), 0)
crop_x = (bg_width - target_width) // 2
crop_y = (bg_height - target_height) // 2
blurred_background = blurred_background[crop_y:crop_y + target_height, crop_x:crop_x + target_width]
# if resized_height <= target_height:
center_y = (target_height - resized_height) // 2
blurred_background[center_y:center_y + resized_height, :] = resized_image
vout.write(blurred_background)
elif mode == "crop":
scale = target_height / img.shape[0]
resized_image = cv2.resize(img, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
frame_width = resized_image.shape[1]
center_x = int(
max_score_face["x"] * scale if max_score_face else frame_width // 2)
top_x = max(min(center_x - target_width // 2, frame_width - target_width), 0)
image_cropped = resized_image[0:target_height, top_x:top_x + target_width]
vout.write(image_cropped)
if vout:
vout.release()
ffmpeg_command = (f"ffmpeg -y -i {temp_video_path} -i {audio_path} "
f"-c:v h264 -preset fast -crf 23 -c:a aac -b:a 128k "
f"{output_path}")
subprocess.run(ffmpeg_command, shell=True, check=True, text=True)
def create_subtitles_with_ffmpeg(transcript_segments: list, clip_start: float, clip_end: float, clip_video_path: str, output_path: str, max_words: int = 5):
temp_dir = os.path.dirname(output_path)
subtitle_path = os.path.join(temp_dir, "temp_subtitles.ass")
clip_segments = [segment for segment in transcript_segments
if segment.get("start") is not None
and segment.get("end") is not None
and segment.get("end") > clip_start
and segment.get("start") < clip_end]
subtitles = []
current_words = []
current_start = None
current_end = None
for segment in clip_segments:
word = segment.get("word", "").strip()
seg_start = segment.get("start")
seg_end = segment.get("end")
if not word or seg_start is None or seg_end is None:
continue
start_rel = max(0.0, seg_start - clip_start)
end_rel = max(0.0, seg_end - clip_start)
if end_rel <= 0:
continue
if not current_words:
current_start = start_rel
current_end = end_rel
current_words = [word]
elif len(current_words) >= max_words:
subtitles.append((current_start, current_end, ' '.join(current_words)))
current_words = [word]
current_start = start_rel
current_end = end_rel
else:
current_words.append(word)
current_end = end_rel
if current_words:
subtitles.append(
(current_start, current_end, ' '.join(current_words))
)
subs = pysubs2.SSAFile()
# TODO: maybe burn animated text into the video instead of doing it this way - for tiktok style
subs.info["WrapStyle"] = 0
subs.info["ScaledBorderAndShadow"] = "yes"
subs.info["PlayResX"] = 1080
subs.info["PlayResY"] = 1920
subs.info["ScriptType"] = "v4.00+"
style_name = "Default"
new_style = pysubs2.SSAStyle()
new_style.fontname = "Anton"
new_style.fontsize = 140
new_style.primarycolor = pysubs2.Color(255, 255, 255)
new_style.outline = 2.0
new_style.shadow = 2.0
new_style.shadowcolor = pysubs2.Color(0,0,0, 128)
new_style.alignment = 2
new_style.marginl = 50
new_style.marginr = 50
new_style.marginv = 50
new_style.spacing = 0.0
subs.styles[style_name] = new_style
for i, (start, end, text) in enumerate(subtitles):
start_time = pysubs2.make_time(s=start)
end_time = pysubs2.make_time(s=end)
line = pysubs2.SSAEvent(start=start_time, end=end_time, text=text, style=style_name)
subs.events.append(line)
subs.save(subtitle_path)
ffmpeg_cmd = (f"ffmpeg -y -i {clip_video_path} -vf \"ass={subtitle_path}\" "
f"-c:v h264 -preset fast -crf 23 {output_path}")
subprocess.run(ffmpeg_cmd, shell=True, check=True)
def process_clip(base_dir: str, original_video_path: str, s3_key: str, start_time: float, end_time: float, clip_index: int, transcript_segments: list):
clip_name = f"clip_{clip_index}"
s3_key_dir = os.path.dirname(s3_key)
output_s3_key = f"{s3_key_dir}/{clip_name}.mp4"
print(f"Output s3 key: {output_s3_key}")
clip_dir = base_dir / clip_name
clip_dir.mkdir(parents=True, exist_ok=True)
# Segment path: original clip from start to end
clip_segment_path = clip_dir / f"{clip_name}_segment.mp4"
vertical_mp4_path = clip_dir / "pyavi" / "video_out_vertical.mp4"
subtitle_output_path = clip_dir / "pyavi" / "video_with_subtitles.mp4"
(clip_dir / "pywork").mkdir(exist_ok=True)
pyframes_path = clip_dir / "pyframes"
pyavi_path = clip_dir / "pyavi"
audio_path = clip_dir / "pyavi" / "audio.wav"
pyframes_path.mkdir(exist_ok=True)
pyavi_path.mkdir(exist_ok=True)
duration = end_time - start_time
cut_command = (f"ffmpeg -i {original_video_path} -ss {start_time} -t {duration} "
f"{clip_segment_path}")
subprocess.run(cut_command, shell=True, check=True, capture_output=True, text=True)
extract_cmd = f"ffmpeg -i {clip_segment_path} -vn -acodec pcm_s16le -ar 16000 -ac 1 {audio_path}"
subprocess.run(extract_cmd, shell=True, check=True, capture_output=True)
shutil.copy(clip_segment_path, base_dir / f"{clip_name}.mp4")
columbia_command = (f"python Columbia_test.py --videoName {clip_name} "
f"--videoFolder {str(base_dir)} "
f"--pretrainModel weight/finetuning_TalkSet.model")
columbia_start_time = time.time()
subprocess.run(columbia_command, cwd="/asd", shell=True)
columbia_end_time = time.time()
print(f"Columbia script completed in {columbia_end_time - columbia_start_time:.2f} seconds")
os.listdir()
tracks_path = clip_dir / "pywork" / "tracks.pckl"
scores_path = clip_dir / "pywork" / "scores.pckl"
if not tracks_path.exists() or not scores_path.exists():
raise FileNotFoundError("Tracks or scores not found for clip")
with open(tracks_path, "rb") as f:
tracks = pickle.load(f)
with open(scores_path, "rb") as f:
scores = pickle.load(f)
cvv_start_time = time.time()
create_vertical_video(
tracks, scores, pyframes_path, pyavi_path, audio_path, vertical_mp4_path
)
cvv_end_time = time.time()
print(f"Clip {clip_index} vertical video creation time: {cvv_end_time - cvv_start_time:.2f} seconds")
create_subtitles_with_ffmpeg(
transcript_segments, start_time, end_time, vertical_mp4_path, subtitle_output_path, max_words=5
)
s3_client = boto3.client("s3")
s3_client.upload_file(subtitle_output_path, "cliptastic", output_s3_key)
@app.cls(gpu="L40S", timeout=900, retries=0, scaledown_window=20, secrets=[modal.Secret.from_name("cliptastic-secret")], volumes={mount_path: volume})
class AiPodcastClipper:
@modal.enter()
def load_model(self):
print("Loading models")
self.whisperx_model = whisperx.load_model("large-v2", device="cuda", compute_type="float16")
self.alignment_model, self.metadata = whisperx.load_align_model(
language_code="en",
device="cuda"
)
print("Transcription models loaded...")
print("Creating gemini client...")
# Initialize clients as None, will be created when needed
self.gemini_client = genai.Client(api_key=os.environ["GEMINI_API_KEY"])
# self.openai_client = None
print("Created gemini client...")
# def _get_gemini_client(self):
# if self.gemini_client is None:
# try:
# print("Initializing Gemini client...")
# self.gemini_client = genai.Client(api_key=os.environ["GEMINI_API_KEY"])
# print("Gemini client initialized successfully")
# except Exception as e:
# print(f"Failed to initialize Gemini client: {str(e)}")
# return None
# return self.gemini_client
# def _get_openai_client(self):
# if self.openai_client is None:
# try:
# print("Initializing OpenAI client...")
# self.openai_client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
# print("OpenAI client initialized successfully")
# except Exception as e:
# print(f"Failed to initialize OpenAI client: {str(e)}")
# return None
# return self.openai_client
def transcribe_video(self, base_dir: str, video_path: str) -> str:
audio_path = base_dir / "audio.wav"
extract_cmd = f"ffmpeg -i {video_path} -vn -acodec pcm_s16le -ar 16000 -ac 1 {audio_path}"
subprocess.run(extract_cmd, shell=True, check=True, capture_output=True)
print("Starting transcription with WhisperX...")
start_time = time.time()
audio = whisperx.load_audio(str(audio_path))
result = self.whisperx_model.transcribe(audio, batch_size=16)
result = whisperx.align(
result["segments"],
self.alignment_model,
self.metadata,
audio,
device="cuda",
return_char_alignments=False
)
duration = time.time() - start_time
print("Transcription and alignment took " + str(duration) + " seconds")
segments = []
if "word_segments" in result:
for word_segment in result["word_segments"]:
segments.append({
"start": word_segment["start"],
"end": word_segment["end"],
"word": word_segment["word"],
})
return json.dumps(segments)
# print(json.dumps(result, indent=2))
def identify_moments(self, transcript: dict):
prompt = """
You are analyzing a podcast video transcript. Each word in the transcript has an associated start and end timestamp. Your goal is to extract viral-worthy clips that are between 30 and 60 seconds long, with a preference for clips that are closer to 40–60 seconds.
These clips should contain engaging moments such as:
Humor (e.g., jokes, funny stories, unexpected responses)
Drama (e.g., personal confessions, emotional tension, conflict)
Surprise or twist (e.g., an unexpected answer, plot twist, punchline)
or other pattern interrupts
Question → Answer exchanges with clear payoff
Story arcs (a mini-story with setup, conflict, and resolution)
Please adhere to the following rules
- Each clip must start and end exactly at sentence boundaries based on the timestamps provided. Do not modify or infer new timestamps.
- No overlapping between clips.
- Start and end timestamps of the clips should align perfectly with the sentence boundaries in the transcript.
- Each clip should start with a clear setup or question, and end with the conclusion or punchline.
- The ideal clip should feel complete and self-contained, suitable for viral distribution on social media.
- Aim to generate longer clips between 40-60 seconds, and ensure to include as much content from the context as viable.
- Format the output as a list of JSON objects, each representing a clip with 'start' and 'end' timestamps: [{"start": seconds, "end": seconds}, ...clip2, clip3]. The output should always be readable by the python json.loads function.
Avoid including:
- Greetings, introductions, goodbyes, and sponsor messages.
- Flat or mundane dialogue with no emotional or intellectual impact.
If there are no valid clips to extract, the output should be an empty list [], in JSON format. Also readable by json.loads() in Python.
the transcript is as follows: \n\n""" + str(transcript)
response = self.gemini_client.models.generate_content(model="gemini-2.5-flash-preview-04-17", contents=prompt)
print(f"Identified moments response: ${response.text}")
return response.text
# # Try Gemini first
# gemini_client = self._get_gemini_client()
# if gemini_client:
# try:
# response = gemini_client.models.generate_content(
# model="gemini-2.5-flash-preview-04-17",
# contents=prompt + json.dumps(transcript)
# )
# print(f"Identified moments response from Gemini: {response.text}")
# return response.text
# except Exception as e:
# print(f"Gemini request failed with error: {str(e)}. Falling back to OpenAI...")
# # Fallback to OpenAI
# openai_client = self._get_openai_client()
# if openai_client:
# try:
# response = openai_client.chat.completions.create(
# model="gpt-4-turbo-preview",
# messages=[
# {"role": "system", "content": "You are a helpful assistant that analyzes podcast transcripts to find viral-worthy clips."},
# {"role": "user", "content": prompt + json.dumps(transcript)}
# ],
# response_format={"type": "json_object"}
# )
# print(f"Identified moments response from OpenAI: {response.choices[0].message.content}")
# return response.choices[0].message.content
# except Exception as e:
# print(f"OpenAI request failed with error: {str(e)}")
# print("Both Gemini and OpenAI failed. Returning empty list.")
# return "[]" # Return empty list if both services fail
@modal.fastapi_endpoint(method="POST")
def process_video(self, request: ProcessVideoRequest, token: HTTPAuthorizationCredentials = Depends(auth_scheme)):
s3_key = request.s3_key
if isinstance(s3_key, str) and s3_key.startswith('{'):
try:
# If s3_key is a JSON string, parse it
s3_key = json.loads(s3_key)['s3_key']
except json.JSONDecodeError:
print("Warning: s3_key appears to be malformed JSON")
pass
# if token.credentia ls != os.environ["AUTH_TOKEN"]:
# raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect bearer token", headers={"WWW-Authenticate": "Bearer"})
run_id = str(uuid.uuid4())
base_dir = pathlib.Path("/tmp") / run_id
base_dir.mkdir(parents=True, exist_ok=True)
print("Files in /tmp:", os.listdir("/tmp"))
print("Environment variables:", dict(os.environ))
# Debug AWS credentials
print("AWS Access Key ID exists:", "AWS_ACCESS_KEY_ID" in os.environ)
print("AWS Secret Access Key exists:", "AWS_SECRET_ACCESS_KEY" in os.environ)
print("AWS Region:", os.environ.get("AWS_DEFAULT_REGION", "not set"))
# Download video file
video_path = base_dir / "input.mp4"
s3_client = boto3.client("s3")
print(f"Attempting to download file from bucket: cliptastic, key: {s3_key}")
try:
# First check if the object exists
s3_client.head_object(Bucket="cliptastic", Key=s3_key)
print("File exists in S3, proceeding with download...")
s3_client.download_file("cliptastic", s3_key, str(video_path))
print("Download completed successfully")
except Exception as e:
print(f"Error accessing S3: {str(e)}")
print(f"Full error details: {type(e).__name__}")
raise
# Transcription
transcript_segments_json = self.transcribe_video(base_dir, video_path)
transcript_segments = json.loads(transcript_segments_json)
# Identify moments for clips
print("Identifying clip moments")
identified_moments_raw = self.identify_moments(transcript_segments)
cleaned_json_string = identified_moments_raw.strip()
if cleaned_json_string.startswith("```json"):
cleaned_json_string = cleaned_json_string[len("```json"):].strip()
if cleaned_json_string.endswith("```"):
cleaned_json_string = cleaned_json_string[:-len("```")].strip()
clip_moments = json.loads(cleaned_json_string)
if not clip_moments or not isinstance(clip_moments, list):
print("Error: Identified moments is not a list")
clip_moments = []
print(clip_moments)
# grab 3 first clips - change to 10 or remove in prod
# Process Clips
for index, moment in enumerate(clip_moments[:1]):
if "start" in moment and "end" in moment:
print("Processing clip" + str(index) + " from " + str(moment["start"]) + " to " + str(moment["end"]))
process_clip(base_dir, video_path, s3_key, moment["start"], moment["end"], index, transcript_segments)
if base_dir.exists():
print(f"Cleaning up temp dir after {base_dir}")
shutil.rmtree(base_dir, ignore_errors=True)
# print(os.listdir(base_dir))
# print("Processing video " + request.s3_key)
@app.local_entrypoint()
def main():
import requests
ai_podcast_clipper = AiPodcastClipper()
url = ai_podcast_clipper.process_video.web_url
payload = {
"s3_key": "test1/ac75min.mp4"
}
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer 123"
}
response = requests.post(url, json=payload,
headers=headers)
response.raise_for_status()
result = response.json()
print(result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment