|
#!/usr/bin/env python3 |
|
""" |
|
HEVC Video Encoder with NVIDIA NVENC |
|
Drag and drop video files onto this script for batch processing |
|
""" |
|
## benchmarking |
|
# ffmpeg -i ".\Albion-Online.exe\output\Albion-Online.exe 2025.05.25 - 00.13.07.16.DVR.mp4" -ss 00:02:13 -t 00:00:30 -i ".\Albion-Online.exe\Albion-Online.exe 2025.05.25 - 00.13.07.16.DVR.mp4" -filter_complex "[0:v]format=yuv420p[dis],[1:v]format=yuv420p[ref],[dis][ref]libvmaf=n_threads=16" -f null - |
|
|
|
import sys |
|
import os |
|
import subprocess |
|
import json |
|
import shutil |
|
import argparse |
|
from pathlib import Path |
|
from datetime import datetime |
|
|
|
# Configuration Variables |
|
OUTPUT_DIR = "output" |
|
VIDEO_CODEC = "hevc_nvenc" |
|
AUDIO_CODEC = "copy" |
|
BITRATE = "10000k" |
|
CONTAINER = "mp4" |
|
PRESET = "slow" |
|
TWO_PASS = True |
|
|
|
def check_ffmpeg(): |
|
"""Check if ffmpeg is available""" |
|
if not shutil.which("ffmpeg"): |
|
print("ERROR: ffmpeg not found in PATH. Please install ffmpeg.") |
|
input("Press Enter to exit...") |
|
sys.exit(1) |
|
|
|
def check_nvenc(): |
|
"""Check if NVIDIA NVENC is available""" |
|
global VIDEO_CODEC |
|
try: |
|
cmd = [ |
|
"ffmpeg", "-f", "lavfi", "-i", "testsrc=duration=1:size=320x240:rate=1", |
|
"-c:v", "hevc_nvenc", "-f", "null", "-" |
|
] |
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) |
|
if "Cannot load" in result.stderr: |
|
print("WARNING: NVIDIA NVENC not available. Falling back to software encoding.") |
|
VIDEO_CODEC = "libx265" |
|
except Exception: |
|
print("WARNING: Could not test NVENC availability. Proceeding with NVENC...") |
|
|
|
def get_video_info(filepath): |
|
"""Get video information using ffprobe""" |
|
cmd = [ |
|
"ffprobe", "-v", "quiet", "-print_format", "json", |
|
"-show_format", "-show_streams", str(filepath) |
|
] |
|
|
|
try: |
|
result = subprocess.run(cmd, capture_output=True, text=True, check=True) |
|
data = json.loads(result.stdout) |
|
|
|
video_stream = next((s for s in data["streams"] if s["codec_type"] == "video"), None) |
|
|
|
if video_stream and "r_frame_rate" in video_stream: |
|
fps_parts = video_stream["r_frame_rate"].split("/") |
|
fps = round(float(fps_parts[0]) / float(fps_parts[1]), 3) |
|
else: |
|
fps = 30 |
|
|
|
duration = round(float(data["format"]["duration"]), 2) |
|
|
|
return {"fps": fps, "duration": duration} |
|
except Exception as e: |
|
print(f" Warning: Could not get video info - {e}") |
|
return {"fps": 30, "duration": 0} |
|
|
|
def encode_video(input_file, output_file, fps, stretch=False, crop=None): |
|
"""Encode video using ffmpeg""" |
|
base_args = [ |
|
"ffmpeg", "-i", str(input_file), |
|
"-y", |
|
"-v", "error", |
|
"-hide_banner", |
|
"-stats", |
|
"-pix_fmt", "yuv420p10le", |
|
"-g", "600", |
|
"-keyint_min", "600", |
|
"-c:v", VIDEO_CODEC, |
|
"-cq:v", "31", |
|
"-c:a", AUDIO_CODEC, |
|
"-r", str(fps), |
|
"-movflags", "+faststart" |
|
] |
|
|
|
video_filters = [] |
|
if stretch: |
|
video_filters.append("scale=trunc(ih*16/9/2)*2:ih,setdar=16/9") |
|
if crop: |
|
width, height = crop.split('x') |
|
video_filters.append(f"crop={width}:{height}") |
|
|
|
if video_filters: |
|
base_args.extend(["-vf", ",".join(video_filters)]) |
|
|
|
if VIDEO_CODEC == "hevc_nvenc": |
|
base_args.extend([ |
|
"-preset", "p7", |
|
"-rc", "vbr", |
|
"-tune", "hq", |
|
"-rc-lookahead", "20", |
|
"-2pass", "true", |
|
"-multipass", "fullres" |
|
]) |
|
elif VIDEO_CODEC == "libx265": |
|
base_args.extend(["-preset", PRESET, "-x265-params", "log-level=error", "-tune", "animation"]) |
|
|
|
if TWO_PASS and VIDEO_CODEC != "hevc_nvenc": |
|
print(" Pass 1/2...") |
|
pass1_args = base_args + ["-pass", "1", "-f", "null", os.devnull] |
|
subprocess.run(pass1_args, check=True) |
|
|
|
print(" Pass 2/2...") |
|
pass2_args = base_args + ["-pass", "2", str(output_file)] |
|
subprocess.run(pass2_args, check=True) |
|
|
|
for log_file in Path(".").glob("ffmpeg2pass-*.log*"): |
|
log_file.unlink(missing_ok=True) |
|
else: |
|
single_pass_args = base_args + [str(output_file)] |
|
subprocess.run(single_pass_args, check=True) |
|
|
|
|
|
def preserve_timestamps(source_file, target_file): |
|
"""Preserve file timestamps from source to target""" |
|
source_stat = source_file.stat() |
|
os.utime(target_file, (source_stat.st_atime, source_stat.st_mtime)) |
|
|
|
def find_videos_by_date(directory, date_from=None, date_to=None): |
|
"""Find video files in directory modified on or after target_date""" |
|
video_extensions = {'.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm', '.m4v', '.mpg', '.mpeg'} |
|
video_files = [] |
|
|
|
dir_path = Path(directory) |
|
if not dir_path.exists(): |
|
print(f"Directory not found: {directory}") |
|
return [] |
|
|
|
date_from_timestamp = datetime.strptime(date_from, "%Y-%m-%d").timestamp() if date_from else None |
|
date_to_timestamp = datetime.strptime(date_to, "%Y-%m-%d").timestamp() if date_to else None |
|
|
|
for file_path in dir_path.glob('*'): |
|
if file_path.is_file() and file_path.suffix.lower() in video_extensions: |
|
file_timestamp = file_path.stat().st_mtime |
|
if date_from_timestamp and file_timestamp < date_from_timestamp: |
|
continue |
|
if date_to_timestamp and file_timestamp > date_to_timestamp: |
|
continue |
|
video_files.append(file_path) |
|
|
|
return sorted(video_files) |
|
|
|
def main(): |
|
parser = argparse.ArgumentParser( |
|
description="HEVC Video Encoder with NVIDIA NVENC", |
|
formatter_class=argparse.RawTextHelpFormatter |
|
) |
|
parser.add_argument("directory", help="Directory containing video files.") |
|
parser.add_argument("--date-from", help="Date to filter videos by (YYYY-mm-dd).") |
|
parser.add_argument("--date-to", help="Date to filter videos by (YYYY-mm-dd).") |
|
parser.add_argument("--stretch", action="store_true", help="Stretch video to 16:9 aspect ratio.") |
|
parser.add_argument("--crop", help="Crop video to specified resolution (e.g., '2560x1080').") |
|
|
|
if len(sys.argv) < 2: |
|
parser.print_help(sys.stderr) |
|
input("Press Enter to exit...") |
|
sys.exit(1) |
|
|
|
args = parser.parse_args() |
|
|
|
directory = args.directory |
|
date_from = args.date_from |
|
date_to = args.date_to |
|
stretch = args.stretch |
|
crop = args.crop |
|
|
|
if crop: |
|
try: |
|
width, height = crop.split('x') |
|
if not (width.isdigit() and height.isdigit()): |
|
raise ValueError("Width and height must be integers.") |
|
except ValueError as e: |
|
print(f"Invalid crop format: {e}. Use WxH (e.g., 2560x1080)") |
|
input("Press Enter to exit...") |
|
sys.exit(1) |
|
|
|
if date_from: |
|
try: |
|
datetime.strptime(date_from, "%Y-%m-%d") |
|
except ValueError: |
|
print("Invalid date format for --date-from. Use YYYY-mm-dd") |
|
input("Press Enter to exit...") |
|
return |
|
|
|
if date_to: |
|
try: |
|
datetime.strptime(date_to, "%Y-%m-%d") |
|
except ValueError: |
|
print("Invalid date format for --date-to. Use YYYY-mm-dd") |
|
input("Press Enter to exit...") |
|
return |
|
|
|
print(f"Searching for videos in: {directory}") |
|
if date_from: |
|
print(f"Modified on or after: {date_from}") |
|
if date_to: |
|
print(f"Modified on or before: {date_to}") |
|
|
|
input_files = find_videos_by_date(directory, date_from, date_to) |
|
|
|
if not input_files: |
|
print("No video files found matching criteria.") |
|
input("Press Enter to exit...") |
|
return |
|
|
|
print(f"Found {len(input_files)} video files:") |
|
for video in input_files: |
|
print(f" - {video.name}") |
|
print() |
|
|
|
# Initial checks |
|
check_ffmpeg() |
|
check_nvenc() |
|
|
|
total_files = len(input_files) |
|
processed_files = 0 |
|
success_count = 0 |
|
failed_files = [] |
|
|
|
print(f"Starting batch processing of {total_files} files...") |
|
print("Configuration:") |
|
print(f" Video Codec: {VIDEO_CODEC}") |
|
print(f" Audio Codec: {AUDIO_CODEC}") |
|
print(f" Two-Pass: {TWO_PASS}") |
|
print(f" Stretch to 16:9: {'Yes' if stretch else 'No'}") |
|
if crop: |
|
print(f" Crop: {crop}") |
|
print() |
|
|
|
for input_file in input_files: |
|
processed_files += 1 |
|
|
|
if not input_file.exists(): |
|
print(f"[{processed_files}/{total_files}] SKIP: File not found - {input_file}") |
|
failed_files.append(str(input_file)) |
|
continue |
|
|
|
base_name = input_file.stem |
|
input_dir = input_file.parent |
|
output_dir_path = input_dir / OUTPUT_DIR |
|
|
|
# Create output directory if it doesn't exist |
|
output_dir_path.mkdir(exist_ok=True) |
|
|
|
output_file = output_dir_path / f"{base_name}.{CONTAINER}" |
|
|
|
print(f"[{processed_files}/{total_files}] Processing: {input_file.name}") |
|
|
|
try: |
|
# Get video information |
|
video_info = get_video_info(input_file) |
|
print(f" Duration: {video_info['duration']}s, FPS: {video_info['fps']}") |
|
|
|
# Encode video |
|
start_time = datetime.now() |
|
encode_video(input_file, output_file, video_info['fps'], stretch, crop) |
|
end_time = datetime.now() |
|
duration = (end_time - start_time).total_seconds() |
|
|
|
if output_file.exists(): |
|
# Preserve original file timestamps |
|
preserve_timestamps(input_file, output_file) |
|
|
|
output_size = output_file.stat().st_size / (1024 * 1024) |
|
print(f" ✓ Completed in {duration:.1f}s - Size: {output_size:.1f} MB") |
|
success_count += 1 |
|
else: |
|
raise Exception("Output file was not created") |
|
|
|
except Exception as e: |
|
print(f" ✗ Failed: {e}") |
|
failed_files.append(str(input_file)) |
|
|
|
print() |
|
|
|
# Summary |
|
print("Batch processing complete!") |
|
print(f"Successfully processed: {success_count}/{total_files} files") |
|
|
|
if failed_files: |
|
print("Failed files:") |
|
for failed in failed_files: |
|
print(f" - {failed}") |
|
|
|
input("Press Enter to exit...") |
|
|
|
if __name__ == "__main__": |
|
main() |