Skip to content

Instantly share code, notes, and snippets.

@aweffr
Last active May 15, 2025 10:57
Show Gist options
  • Save aweffr/55d966c314316eb1dcdc4f40c112ae39 to your computer and use it in GitHub Desktop.
Save aweffr/55d966c314316eb1dcdc4f40c112ae39 to your computer and use it in GitHub Desktop.
yt-dlp download service
"""
YT-DLP FastAPI Server
=====================
A FastAPI-based web service for downloading videos from various platforms using yt-dlp.
Features include concurrent download management, progress tracking, API key authentication,
task persistence, and disk space management.
Installation
-----------
Install the required dependencies:
pip install fastapi~=0.115.2
pip install pydantic~=2.11.4
pip install uvicorn~=0.34.2
pip install yt-dlp~=2025.4.30
Usage
-----
Start the server with default settings:
python app.py --api-key your_secret_key
Or with custom settings:
python app.py --host 127.0.0.1 --port 8080 --max-downloads 3 --api-key-file /path/to/keyfile
Available arguments:
--host Host to bind the server to (default: 0.0.0.0)
--port Port to bind the server to (default: 8000)
--api-key API key for securing the /download endpoint
--api-key-file Path to file containing the API key (more secure)
--max-downloads Maximum number of concurrent downloads (default: 5)
--cookies-file Path to cookies.txt file to bypass YouTube's robot detection
API Key can also be provided via YT_DLP_API_KEY environment variable.
API Endpoints
------------
POST /download - Queue a new download (requires API key)
GET /status/{id} - Check status of a specific download
GET /tasks - List all downloads
GET /media/{file} - Access downloaded files
Example:
curl -X POST "http://localhost:8000/download" \\
-H "X-API-KEY: your_secret_key" \\
-H "Content-Type: application/json" \\
-d '{"url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ"}'
Downloads are stored in the ./media-download directory by default.
"""
import asyncio
import os
import uuid
import argparse
import logging
import shutil
import json
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Dict, Any, Optional
from fastapi import FastAPI, HTTPException, Security, Depends, BackgroundTasks, Request
from fastapi.security.api_key import APIKeyHeader
from fastapi.staticfiles import StaticFiles
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
import uvicorn
import yt_dlp
# --- Configuration ---
MEDIA_DOWNLOAD_DIR = Path("./media-download")
DEFAULT_MAX_CONCURRENT_DOWNLOADS = 5
TASK_RETENTION_DAYS = 7 # How long to keep completed tasks in memory
DISK_SPACE_MINIMUM_GB = 1.0 # Minimum free space required in GB
TASKS_PERSISTENCE_FILE = Path("./tasks_state.json") # For saving task state
COOKIES_FILE = None # Will be initialized from command-line arguments
# --- Logging Setup ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
yt_dlp_logger = logging.getLogger('yt_dlp_process') # Dedicated logger for yt-dlp internal messages
# --- Global State & Concurrency Control ---
tasks: Dict[str, Dict[str, Any]] = {}
download_semaphore: Optional[asyncio.Semaphore] = None # To be initialized in main based on args
# --- Pydantic Models ---
class DownloadRequest(BaseModel):
url: str = Field(..., examples=["https://www.youtube.com/watch?v=dQw4w9WgXcQ"]) # Example for schema
class TaskInfo(BaseModel):
task_id: str
status: str
progress_percent: Optional[float] = None
progress_eta_str: Optional[str] = None
progress_speed_str: Optional[str] = None
total_bytes: Optional[int] = None
downloaded_bytes: Optional[int] = None
filename: Optional[str] = None
original_url: str
error_message: Optional[str] = None
download_url: Optional[str] = None
created_at: str
updated_at: str
class TaskCreationResponse(BaseModel):
task_id: str
status: str
message: str
# --- API Key Authentication ---
API_KEY_NAME = "X-API-KEY"
api_key_header_auth = APIKeyHeader(name=API_KEY_NAME, auto_error=False)
EXPECTED_API_KEY: str = "" # Will be initialized from command-line arguments
async def get_api_key(api_key_header: Optional[str] = Security(api_key_header_auth)):
if not EXPECTED_API_KEY: # Server-side configuration error
logger.error("API Key not configured on the server.")
raise HTTPException(status_code=500, detail="API Key not configured on server.")
if api_key_header == EXPECTED_API_KEY:
return api_key_header
else:
logger.warning(f"Invalid API Key attempt. Provided: '{api_key_header}'")
raise HTTPException(status_code=403, detail="Could not validate credentials")
# --- FastAPI Application Instance ---
app = FastAPI(
title="YT-DLP FastAPI Server",
description="A FastAPI server to download media using yt-dlp with concurrent task management, API key authentication, and progress tracking.",
version="1.1.0"
)
# --- Helper Functions ---
def get_current_utc_iso() -> str:
"""Returns the current time in UTC ISO format."""
return datetime.now(timezone.utc).isoformat()
def format_eta(seconds: Optional[int]) -> str:
"""Formats ETA from seconds to HH:MM:SS or MM:SS string."""
if seconds is None or not isinstance(seconds, (int, float)) or seconds < 0:
return "N/A"
try:
secs = int(seconds)
mins, secs = divmod(secs, 60)
hours, mins = divmod(mins, 60)
if hours > 0:
return f"{hours:02d}:{mins:02d}:{secs:02d}"
return f"{mins:02d}:{secs:02d}"
except Exception:
return "N/A"
def get_free_disk_space_gb() -> float:
"""Returns free disk space in GB for the media directory"""
stat = shutil.disk_usage(MEDIA_DOWNLOAD_DIR)
return stat.free / 1024 / 1024 / 1024 # Convert bytes to GB
async def cleanup_old_tasks():
"""Remove tasks older than TASK_RETENTION_DAYS from memory"""
if not tasks:
return
cutoff_date = datetime.now(timezone.utc) - timedelta(days=TASK_RETENTION_DAYS)
cutoff_str = cutoff_date.isoformat()
# Find task_ids that are old enough and completed/failed
to_remove = [
task_id for task_id, task in tasks.items()
if (task["status"] in ["completed", "failed"] and
task["updated_at"] < cutoff_str)
]
# Remove them from the dictionary
for task_id in to_remove:
del tasks[task_id]
if to_remove:
logger.info(f"Cleaned up {len(to_remove)} old tasks from memory.")
def save_tasks_state():
"""Save current tasks state to disk"""
try:
# Convert datetime objects to strings for serialization
with open(TASKS_PERSISTENCE_FILE, 'w') as f:
json.dump(tasks, f)
logger.info(f"Tasks state saved to {TASKS_PERSISTENCE_FILE}")
except Exception as e:
logger.error(f"Failed to save tasks state: {e}")
def load_tasks_state():
"""Load tasks state from disk if available"""
global tasks
if not TASKS_PERSISTENCE_FILE.exists():
logger.info("No tasks state file found. Starting with empty tasks.")
return
try:
with open(TASKS_PERSISTENCE_FILE, 'r') as f:
loaded_tasks = json.load(f)
tasks.update(loaded_tasks)
logger.info(f"Loaded {len(loaded_tasks)} tasks from {TASKS_PERSISTENCE_FILE}")
except Exception as e:
logger.error(f"Failed to load tasks state: {e}")
# --- yt-dlp Download Logic ---
def _update_task_progress_hook(task_id: str, hook_data: Dict[str, Any]):
"""
Progress hook for yt-dlp. Updates the task's status in the global `tasks` dictionary.
"""
task = tasks.get(task_id)
if not task:
logger.warning(f"Progress hook called for non-existent task_id: {task_id}")
return
task["updated_at"] = get_current_utc_iso()
status = hook_data.get('status')
if status == 'downloading':
task['status'] = 'downloading'
total_bytes = hook_data.get('total_bytes') or hook_data.get('total_bytes_estimate')
downloaded_bytes = hook_data.get('downloaded_bytes')
if total_bytes:
task['total_bytes'] = total_bytes
if downloaded_bytes:
task['downloaded_bytes'] = downloaded_bytes
if task.get('total_bytes') and task['total_bytes'] > 0 and task.get('downloaded_bytes') is not None:
task['progress_percent'] = round((task['downloaded_bytes'] / task['total_bytes']) * 100, 2)
elif '_percent_str' in hook_data: # Fallback if bytes are not available
try:
percent_str = hook_data['_percent_str'].replace('%','').strip()
task['progress_percent'] = float(percent_str)
except ValueError:
pass # Keep previous or default if parsing fails
task['progress_eta_seconds'] = hook_data.get('eta') # ETA in seconds
task['progress_eta_str'] = format_eta(task['progress_eta_seconds'])
task['progress_speed_str'] = hook_data.get('_speed_str', 'N/A')
# filename can appear here during download, especially for fragmented downloads
# or if yt-dlp determines it early.
if 'filename' in hook_data and not task.get('filename'):
task['filename'] = Path(hook_data['filename']).name
elif 'info_dict' in hook_data and 'filename' in hook_data['info_dict'] and not task.get('filename'):
task['filename'] = Path(hook_data['info_dict']['filename']).name
elif status == 'finished':
task['status'] = 'completed'
task['progress_percent'] = 100.0
task['downloaded_bytes'] = task.get('total_bytes', task.get('downloaded_bytes', 0)) # Ensure downloaded_bytes matches total_bytes
# yt-dlp provides the final filename in 'filename' key in the hook_data for 'finished' status
final_filepath_str = hook_data.get('filename')
if final_filepath_str:
final_filepath = Path(final_filepath_str)
task['filename'] = final_filepath.name
task['final_filepath'] = str(final_filepath.resolve())
else: # Fallback if filename is not in hook_data (should be rare)
logger.warning(f"Task {task_id} finished but no filename in hook_data. Attempting to find it.")
# This part might need more sophisticated logic if filename is truly missing.
# For now, we'll rely on it being set during 'downloading' or hope it's there.
if not task.get('filename'): # If filename was never set
task['status'] = 'failed' # Or some 'unknown_file' state
task['error_message'] = "Download finished, but final filename could not be determined."
logger.info(f"Task {task_id} completed. File: {task.get('filename', 'N/A')}")
elif status == 'error':
task['status'] = 'failed'
task['error_message'] = "yt-dlp reported an error during download or processing."
logger.error(f"Task {task_id} failed due to yt-dlp error. Hook data: {hook_data.get('error') or hook_data}")
async def execute_yt_dlp_download(task_id: str, url: str):
"""
Executes the yt-dlp download in a separate thread.
Updates task status via progress hooks.
Releases the semaphore upon completion or failure.
"""
task = tasks[task_id] # Assume task_id exists and is initialized
# Check for available disk space before starting download
free_space_gb = get_free_disk_space_gb()
if free_space_gb < DISK_SPACE_MINIMUM_GB:
logger.error(f"Task {task_id}: Not enough disk space. Available: {free_space_gb:.2f}GB, Required minimum: {DISK_SPACE_MINIMUM_GB}GB")
task['status'] = 'failed'
task['error_message'] = f"Not enough disk space (available: {free_space_gb:.2f}GB)"
task['updated_at'] = get_current_utc_iso()
download_semaphore.release()
return
def progress_hook_wrapper(d):
# This wrapper is called by yt-dlp (synchronously within its thread)
_update_task_progress_hook(task_id, d)
# yt-dlp options
ydl_opts = {
'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best', # Prioritize MP4
'outtmpl': str(MEDIA_DOWNLOAD_DIR / '%(title)s.%(ext)s'), # Output template
'noplaylist': True, # Download only single video if URL is a playlist
'progress_hooks': [progress_hook_wrapper],
'postprocessor_hooks': [progress_hook_wrapper], # For merge status etc.
'logger': yt_dlp_logger, # Use the dedicated logger for yt-dlp's own messages
'noprogress': False, # Ensure progress is reported
'merge_output_format': 'mp4', # Ensure merged output is mp4
'retries': 5, # Number of retries for HTTP errors
'fragment_retries': 5, # Retries for fragments
'socket_timeout': 30, # Timeout for network operations
'quiet': True, # Suppress yt-dlp's direct console output, we use hooks and our logger
'verbose': False, # Don't be too verbose in yt_dlp_logger unless debugging
'no_warnings': True, # Suppress yt-dlp warnings if not critical
}
# Add cookies file if specified
if COOKIES_FILE:
ydl_opts['cookiefile'] = COOKIES_FILE
logger.info(f"Task {task_id}: Using cookies file: {COOKIES_FILE}")
try:
logger.info(f"Task {task_id}: Starting download for URL: {url}")
task['status'] = 'downloading' # Explicitly set before blocking call
task['updated_at'] = get_current_utc_iso()
# Run the blocking yt-dlp download in a separate thread
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
await asyncio.to_thread(ydl.download, [url])
if task['status'] == 'downloading':
logger.warning(f"Task {task_id}: yt-dlp process finished, but status is still 'downloading'. Performing post-check.")
if task.get('final_filepath') and Path(task['final_filepath']).exists():
task['status'] = 'completed'
task['progress_percent'] = 100.0
logger.info(f"Task {task_id}: Post-check confirmed completion. File: {task['filename']}")
elif task.get('filename'):
potential_file = MEDIA_DOWNLOAD_DIR / task['filename']
if potential_file.exists():
task['status'] = 'completed'
task['progress_percent'] = 100.0
task['final_filepath'] = str(potential_file.resolve())
logger.info(f"Task {task_id}: Post-check confirmed completion by finding {task['filename']}.")
else:
task['status'] = 'failed'
task['error_message'] = "Download process ended, but final file could not be verified and status was 'downloading'."
logger.error(f"Task {task_id}: {task['error_message']}")
else:
task['status'] = 'failed'
task['error_message'] = "Download process ended, status 'downloading', but no file information available to confirm completion."
logger.error(f"Task {task_id}: {task['error_message']}")
if task['status'] == 'completed' and task.get('total_bytes') is not None:
task['downloaded_bytes'] = task['total_bytes']
except yt_dlp.utils.DownloadError as e:
logger.error(f"Task {task_id}: yt-dlp DownloadError for URL {url}. Error: {str(e)}")
task['status'] = 'failed'
task['error_message'] = f"yt-dlp DownloadError: {str(e)}"
except Exception as e:
logger.error(f"Task {task_id}: Unexpected error during download for URL {url}. Error: {str(e)}", exc_info=True)
task['status'] = 'failed'
task['error_message'] = f"An unexpected error occurred: {str(e)}"
finally:
task["updated_at"] = get_current_utc_iso()
if download_semaphore:
download_semaphore.release()
active_downloads = DEFAULT_MAX_CONCURRENT_DOWNLOADS - (download_semaphore._value if download_semaphore else 0)
logger.info(f"Task {task_id} finished processing (status: {task['status']}). Semaphore released. Active downloads: {active_downloads}")
# --- API Endpoints ---
@app.on_event("startup")
async def startup_event():
"""
Event handler for application startup.
Creates media download directory and initializes the download semaphore.
"""
global download_semaphore, DEFAULT_MAX_CONCURRENT_DOWNLOADS, EXPECTED_API_KEY
if download_semaphore is None:
try:
cli_max_downloads = int(os.environ.get("YT_DLP_MAX_DOWNLOADS", DEFAULT_MAX_CONCURRENT_DOWNLOADS))
DEFAULT_MAX_CONCURRENT_DOWNLOADS = cli_max_downloads
except ValueError:
logger.warning(f"Invalid YT_DLP_MAX_DOWNLOADS env var. Using default: {DEFAULT_MAX_CONCURRENT_DOWNLOADS}")
download_semaphore = asyncio.Semaphore(DEFAULT_MAX_CONCURRENT_DOWNLOADS)
logger.info(f"Semaphore initialized on startup event with value: {DEFAULT_MAX_CONCURRENT_DOWNLOADS}")
if not EXPECTED_API_KEY:
EXPECTED_API_KEY = os.environ.get("YT_DLP_API_KEY", "")
if EXPECTED_API_KEY:
logger.info("API Key loaded from environment variable")
MEDIA_DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True)
logger.info(f"Media download directory ensured at: {MEDIA_DOWNLOAD_DIR.resolve()}")
load_tasks_state()
app.mount("/media", StaticFiles(directory=MEDIA_DOWNLOAD_DIR, html=False), name="media")
logger.info(f"Serving static files from '{MEDIA_DOWNLOAD_DIR.resolve()}' at '/media'")
asyncio.create_task(periodic_cleanup())
async def periodic_cleanup():
"""Run cleanup tasks periodically"""
while True:
await asyncio.sleep(3600)
await cleanup_old_tasks()
save_tasks_state()
logger.info(f"System status: Disk free: {get_free_disk_space_gb():.2f}GB, Active tasks: {len(tasks)}")
@app.on_event("shutdown")
async def shutdown_event():
"""Save state when server shuts down"""
save_tasks_state()
logger.info("Server shutting down, tasks state saved.")
@app.post("/download", response_model=TaskCreationResponse, status_code=202, tags=["Download"])
async def create_download_task_endpoint(
download_req: DownloadRequest,
background_tasks: BackgroundTasks,
api_key: str = Depends(get_api_key)
):
"""
Accepts a URL (e.g., YouTube) and queues it for download.
Requires a valid X-API-KEY header for authentication.
Returns a task ID and initial status.
"""
if not download_semaphore:
logger.error("Download semaphore not initialized. This is an internal server error.")
raise HTTPException(status_code=500, detail="Server not properly configured (semaphore missing).")
task_id = str(uuid.uuid4())
current_time_iso = get_current_utc_iso()
tasks[task_id] = {
"status": "pending",
"progress_percent": 0.0,
"progress_eta_seconds": None,
"progress_eta_str": "N/A",
"progress_speed_str": "N/A",
"total_bytes": 0,
"downloaded_bytes": 0,
"filename": None,
"original_url": download_req.url,
"error_message": None,
"created_at": current_time_iso,
"updated_at": current_time_iso,
"final_filepath": None
}
logger.info(f"Task {task_id} created for URL: {download_req.url}. Awaiting semaphore acquisition.")
await download_semaphore.acquire()
active_downloads = DEFAULT_MAX_CONCURRENT_DOWNLOADS - download_semaphore._value
logger.info(f"Semaphore acquired for task {task_id}. Active downloads: {active_downloads}")
background_tasks.add_task(execute_yt_dlp_download, task_id, download_req.url)
return TaskCreationResponse(
task_id=task_id,
status="pending",
message="Download task accepted and queued for processing."
)
@app.get("/status/{task_id}", response_model=TaskInfo, tags=["Status"])
async def get_task_status_endpoint(task_id: str, request: Request):
"""
Retrieves the status and progress of a specific download task.
If the download is completed, a 'download_url' for the media file is provided.
"""
task = tasks.get(task_id)
if not task:
logger.warning(f"Status requested for non-existent task_id: {task_id}")
raise HTTPException(status_code=404, detail=f"Task ID '{task_id}' not found.")
response_data = task.copy()
response_data["task_id"] = task_id
if task["status"] == "completed" and task.get("filename"):
base_url = str(request.base_url).rstrip('/')
response_data["download_url"] = f"{base_url}/media/{task['filename']}"
else:
response_data["download_url"] = None
return TaskInfo(**response_data)
@app.get("/tasks", response_model=list[TaskInfo], tags=["Status"])
async def list_all_tasks_endpoint(request: Request):
"""
Lists all managed tasks (pending, downloading, completed, failed) with their current status.
"""
response_list = []
if not tasks:
return []
for task_id, task_data in tasks.items():
data_copy = task_data.copy()
data_copy["task_id"] = task_id
if data_copy["status"] == "completed" and data_copy.get("filename"):
base_url = str(request.base_url).rstrip('/')
data_copy["download_url"] = f"{base_url}/media/{data_copy['filename']}"
else:
data_copy["download_url"] = None
response_list.append(TaskInfo(**data_copy))
response_list.sort(key=lambda t: t.created_at, reverse=True)
return response_list
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="FastAPI server for yt-dlp downloads.")
parser.add_argument(
"--host",
type=str,
default="0.0.0.0",
help="Host to bind the server to. Default: 0.0.0.0"
)
parser.add_argument(
"--port",
type=int,
default=8000,
help="Port to bind the server to. Default: 8000"
)
parser.add_argument(
"--api-key",
type=str,
help="API Key for securing the /download endpoint. Can also be set via YT_DLP_API_KEY env variable."
)
parser.add_argument(
"--api-key-file",
type=str,
help="Path to file containing the API key (more secure than command line)"
)
parser.add_argument(
"--max-downloads",
type=int,
default=DEFAULT_MAX_CONCURRENT_DOWNLOADS,
help=f"Maximum number of concurrent downloads. Default: {DEFAULT_MAX_CONCURRENT_DOWNLOADS}"
)
parser.add_argument(
"--cookies-file",
type=str,
help="Path to cookies.txt file to bypass YouTube's robot detection"
)
args = parser.parse_args()
if args.api_key_file:
try:
with open(args.api_key_file, 'r') as f:
EXPECTED_API_KEY = f.read().strip()
logger.info(f"API Key loaded from file: {args.api_key_file}")
except Exception as e:
logger.critical(f"Failed to read API key from file: {e}")
exit(1)
elif args.api_key:
EXPECTED_API_KEY = args.api_key
else:
EXPECTED_API_KEY = os.environ.get("YT_DLP_API_KEY", "")
if not EXPECTED_API_KEY:
logger.critical("No API key provided. Use --api-key, --api-key-file, or YT_DLP_API_KEY env var.")
exit(1)
DEFAULT_MAX_CONCURRENT_DOWNLOADS = args.max_downloads
download_semaphore = asyncio.Semaphore(args.max_downloads)
os.environ["YT_DLP_MAX_DOWNLOADS"] = str(args.max_downloads)
# Set cookies file if provided
if args.cookies_file:
cookies_path = Path(args.cookies_file)
if not cookies_path.exists():
logger.critical(f"Cookies file not found: {args.cookies_file}")
exit(1)
COOKIES_FILE = str(cookies_path.resolve())
logger.info(f"Using cookies file: {COOKIES_FILE}")
logger.info(f"Starting YT-DLP FastAPI Server on {args.host}:{args.port}")
logger.info(f"API Key configured. Use header '{API_KEY_NAME}: <your_api_key>' for /download.")
logger.info(f"Maximum concurrent downloads set to: {args.max_downloads}")
logger.info(f"Media will be downloaded to: {MEDIA_DOWNLOAD_DIR.resolve()}")
uvicorn.run(app, host=args.host, port=args.port)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment