Created
April 28, 2024 00:36
-
-
Save sonipranjal/5ef17518adda4f9bf551eb29ddf91d7c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import shutil, tempfile, modal, uuid, os, boto3, ffmpeg, logging | |
from enum import Enum | |
from pydantic import BaseModel | |
from typing import Optional, Dict, List, Any | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
app_image = ( | |
modal.Image.debian_slim() | |
.apt_install("ffmpeg") | |
.pip_install("ffmpeg-python", "boto3") | |
) | |
stub = modal.Stub("coc-video-pipeline", image=app_image, secrets=[modal.Secret.from_name("coc-video-pipeline")]) | |
logging.basicConfig(level=logging.INFO) | |
with app_image.imports(): | |
CLOUDFLARE_ACCOUNT_ID = os.getenv("CLOUDFLARE_ACCOUNT_ID") | |
CLOUDFLARE_R2_ACCESS_KEY = os.getenv("CLOUDFLARE_R2_ACCESS_KEY") | |
CLOUDFLARE_R2_SECRET = os.getenv("CLOUDFLARE_R2_SECRET") | |
CLOUDFLARE_R2_BUCKET_NAME = os.getenv("CLOUDFLARE_R2_BUCKET_NAME") | |
CLOUDFLARE_R2_DOMAIN = os.getenv("CLOUDFLARE_R2_DOMAIN") | |
s3_client = boto3.client( | |
service_name="s3", | |
endpoint_url=f"https://{CLOUDFLARE_ACCOUNT_ID}.r2.cloudflarestorage.com", | |
aws_access_key_id=CLOUDFLARE_R2_ACCESS_KEY, | |
aws_secret_access_key=CLOUDFLARE_R2_SECRET, | |
region_name="auto" | |
) | |
bucket_name = CLOUDFLARE_R2_BUCKET_NAME | |
class RequestBody(BaseModel): | |
video_key_r2: str | |
@stub.function(timeout=4000) | |
def process_video_in_background(video_key_r2:str): | |
base_path = f"processed-videos/{video_key_r2.rsplit('.', 1)[0]}-{uuid.uuid4()}" | |
def download_file_from_r2(key): | |
temp_dir = 'temp' | |
if not os.path.exists(temp_dir): | |
os.makedirs(temp_dir) | |
temp_file_path = os.path.join(temp_dir, f"{uuid.uuid4()}.tmp") | |
with open(temp_file_path, 'wb') as f: | |
s3_client.download_fileobj(bucket_name, key, f) | |
return temp_file_path | |
def upload_file_to_r2(file_path, key): | |
try: | |
with open(file_path, 'rb') as f: | |
s3_client.upload_fileobj(f, bucket_name, key) | |
url = f"https://{CLOUDFLARE_R2_DOMAIN}/{key}" | |
logging.info(f"File uploaded successfully: {url}") | |
return url | |
except Exception as e: | |
logging.error(f"Failed to upload file {file_path} to R2: {e}") | |
return None | |
def convert_video(input_path, resolution, res_label): | |
temp_dir = tempfile.mkdtemp() | |
output_path = f'{temp_dir}/index.m3u8' | |
hls_base_url = f"https://{CLOUDFLARE_R2_DOMAIN}/{base_path}/{res_label}/" | |
try: | |
ffmpeg.input(input_path).output( | |
output_path, | |
format='hls', | |
hls_time=10, | |
hls_list_size=0, | |
start_number=0, | |
vcodec='libx264', | |
s=resolution, | |
acodec='aac', | |
audio_bitrate='320k', | |
ar='48000', | |
hls_base_url=hls_base_url | |
).run(overwrite_output=True) | |
print("FFmpeg processing completed successfully.") | |
return temp_dir | |
except Exception as e: | |
# If ffmpeg fails, log the error and clean up the directory | |
logging.error(f"Error during video conversion: {e}") | |
shutil.rmtree(temp_dir) # Delete the temp directory since conversion failed | |
return None | |
def convert_and_upload(input_path, res_label, res_value): | |
output_dir = convert_video(input_path, res_value, res_label) | |
print("uploading to r2 baby!!", output_dir) | |
playlist_url = None | |
with ThreadPoolExecutor() as executor: | |
futures = [] | |
for root, dirs, files in os.walk(output_dir): | |
for file in files: | |
full_path = os.path.join(root, file) | |
file_key = f"{base_path}/{res_label}/{file}" | |
if 'index_vtt.m3u8' not in file and file.endswith('.m3u8'): | |
is_playlist = True | |
else: | |
is_playlist = False | |
future = executor.submit(upload_file_to_r2, full_path, file_key) | |
futures.append((future, is_playlist)) | |
for future, is_playlist in futures: | |
result = future.result() | |
if result and is_playlist: | |
playlist_url = result | |
elif result: | |
print(f"Uploaded: {result}") | |
else: | |
print("Failed to upload some files.") | |
return res_label, playlist_url | |
video_urls = {} | |
try: | |
input_path = download_file_from_r2(video_key_r2) | |
resolutions = {'1080p': '1920x1080', '720p': '1280x720'} | |
video_urls = {} | |
with ThreadPoolExecutor(max_workers=len(resolutions)) as executor: | |
future_to_label = {executor.submit(convert_and_upload, input_path, label, value): label for label, value in resolutions.items()} | |
for future in as_completed(future_to_label): | |
res_label = future_to_label[future] | |
try: | |
_, playlist_url = future.result() | |
if playlist_url: | |
video_urls[res_label] = playlist_url | |
except Exception as exc: | |
logging.error(f'Error processing resolution {res_label}: {exc}') | |
# Clean up the local file | |
os.remove(input_path) | |
except Exception as e: | |
return {"error": str(e)} | |
print("processed successfully", video_urls) | |
@stub.function() | |
@modal.web_endpoint(method="POST") | |
def process(video: RequestBody): | |
process_video_in_background.spawn(video_key_r2=video.video_key_r2) | |
return {"message": "Video is being processed!"} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment