Last active
October 5, 2023 17:08
-
-
Save in03/aea16ddccd04bb178b2ef988b73664b3 to your computer and use it in GitHub Desktop.
Encode proxies for DaVinci Resolve using FFMpeg
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Windows: Create a shortcut to a .bat file in 'shell:sendto' to call this script with files passed as arguments | |
# Use with 'save_proxy_clip_list.py' gist to quickly pull used timeline clips into FFMPEG. | |
# Use 'link_proxies.py' gist to relink proxies correctly. | |
# Bug in current Resolve release links clips wrongly to one or two proxies only. | |
# This assumes FFMPEG is on path. | |
import os, sys, shutil | |
import subprocess | |
import argparse | |
import pathlib | |
import winsound | |
from winsound import Beep | |
from datetime import datetime | |
ap = argparse.ArgumentParser(description='Watchfolder or manually queued Resolve proxy encodes', | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
ap.add_argument('path', nargs='+', | |
help='Path of a file or a folder of files.') | |
ap.add_argument("-d", "--dryrun", required=False, | |
help="use to skip encoding for debug purposes") | |
ap.add_argument("-y", "--yes", required=False, | |
help="assume yes at any prompts") | |
args = ap.parse_args() | |
# Globals: | |
################################################### | |
# Vars | |
acceptable_exts = ['.mp4', '.mov', '.mxf'] | |
encodable = [] | |
skipped = [] | |
cwd = os.getcwd() | |
proxy_path_root = "S:\\ProxyMedia" | |
################################################### | |
# Logging path | |
if not os.path.exists(proxy_path_root): | |
raise Exception(f"{proxy_path_root} does not exist. Please check write path.") | |
log_path = os.path.join(proxy_path_root, "ProxyEncoder.log") | |
def confirm(message): | |
answer = input(message + "\n") | |
print("\n") | |
if "y" in answer.lower(): | |
return True | |
elif "n" in answer.lower(): | |
return False | |
else: | |
print(f"Invalid response, '{answer}'. Please answer 'yes' or 'no'") | |
confirm(message) | |
def print_and_log(message, log_only=False, add_time=True): | |
if not log_only: | |
print(message) | |
with open(log_path, "a+") as changelog: | |
changelog.write(f"[{datetime.now()}] {message} \n") | |
def get_vids(filepaths): | |
try: | |
for path in filepaths: | |
print_and_log(f"Checking path: '{path}'", log_only=True) | |
if os.path.isfile(path): | |
if os.path.splitext(path)[1].casefold() in acceptable_exts: | |
print_and_log(f"Queued {path}") | |
# Get folder structure | |
p = pathlib.Path(path) | |
output_dir = os.path.join(proxy_path_root, os.path.dirname(p.relative_to(*p.parts[:1]))) | |
# Make folder structure | |
os.makedirs(output_dir, exist_ok=True) | |
# Return full output file path | |
output_file = os.path.join(output_dir, os.path.splitext(os.path.basename(path))[0]) | |
encodable.append({"source_media":path, "output_file":output_file}) | |
else: | |
print_and_log(f"Skipping {path}") # todo: Directory scanning not working yet. Only works on files. "Too many values to unpack" | |
skipped.append(path) | |
elif os.path.isdir(path): | |
print(f"Scanning directory: {path}") | |
for root, files in os.walk(path, topdown=True): | |
for file in files: | |
get_vids(os.path.join(root, file)) | |
except Exception as e: | |
print_and_log(e) | |
return skipped, encodable | |
def parse_list(file): | |
f = open(file, 'r') | |
lines = f.readlines() | |
lines = [i.strip() for i in lines] | |
return lines | |
def encode(src_path, output_path): | |
try: | |
print("\n") | |
filename = os.path.basename(src_path) | |
print_and_log(f"Encoding '{filename}'") | |
#-pix_fmt yuv422p10le -c:v prores_ks -profile:v 3 -vendor ap10 -vf scale=1280:720,fps=50/1 | |
subprocess.run((f"ffmpeg.exe -y -i \"{src_path}\" -c:v dnxhd -profile:v dnxhr_sq -vf scale=1280:720,fps=50/1,format=yuv422p -c:a pcm_s16le -ar 48000 -v warning -stats -hide_banner \"{output_path}.mxf\""), | |
shell=True, stdout=subprocess.PIPE) | |
winsound.Beep(600, 200) # Success beep | |
return True | |
except Exception as e: | |
print_and_log("\n\n----------------------------", add_time=False) | |
print_and_log(e) | |
print_and_log("----------------------------\n\n", add_time=False) | |
print_and_log(f"Failed encoding: {src_path}") | |
winsound.Beep(375, 150) # Fail beep | |
if __name__ == "__main__": | |
new_encode = f"################# {datetime.now().strftime('%A, %d, %B, %y, %I:%M %p')} #################" | |
print_and_log(new_encode, log_only=True, add_time=False) | |
filepaths = args.path | |
for file in filepaths: | |
if ".txt" in os.path.splitext(file)[1]: | |
print(f"Parsing list from file '{file}'\n") | |
txt_file_paths = parse_list(file) | |
print(txt_file_paths) | |
filepaths.remove(file) # Remove the text file for processing | |
# Get encodable files from text clip list | |
skipped, encodable_from_txt = get_vids(txt_file_paths) | |
# Get any dirs, files passed for processing | |
skipped, encodable_loose = get_vids(filepaths) | |
# Combine encode lists | |
encodable = encodable_from_txt + encodable_loose | |
print(encodable) | |
for video in encodable: | |
print_and_log(f"Queued {video['source_media']}") | |
# Confirm encode | |
if not args.yes: | |
if not confirm("Encode the above files?"): | |
print("Aborting encode.") | |
sys.exit(1) | |
# Encode loose files | |
for file in encodable: | |
print(type(file)) | |
if not args.dryrun: | |
if encode(file['source_media'], file['output_file']): | |
print_and_log(f"Successfully encoded: {file}") | |
print(f"Done encoding. Check log file: '{log_path}'") | |
# Finished jingle | |
for i in range(1, 10): | |
winsound.Beep(i * 100, 200) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Link proxies | |
# Bug in current Resolve release links clips wrongly to one or two proxies only. | |
# Don't forget to update proxy_path_root below. | |
import os, sys | |
import traceback | |
import tkinter | |
import tkinter.messagebox | |
from tkinter import filedialog | |
from python_get_resolve import GetResolve | |
from datetime import datetime | |
proxy_path_root = "S:\\ProxyMedia" | |
acceptable_exts = [".mov",".mp4",".mxf",".avi"] | |
def get_proxy_path(): | |
root = tkinter.Tk() | |
root.withdraw() | |
f = filedialog.askdirectory(initialdir = proxy_path_root, title = "Link proxies") | |
if f is None: | |
print("User cancelled dialog. Exiting.") | |
exit(0) | |
return f | |
def filter_videos(dir): | |
videos = [] | |
# Walk directory to match files | |
for root, dirs, files in os.walk(dir, topdown=False): | |
for name in files: | |
file = os.path.join(root, name) | |
# print(file) | |
# Check extension is allowed | |
if os.path.splitext(file)[1].lower() in acceptable_exts: | |
videos.append(file) | |
return videos | |
def match_proxies(potential_proxies): | |
linked = [] | |
track_len = timeline.GetTrackCount("video") | |
print(f"Video track count: {track_len}") | |
for i in range(track_len): | |
items = timeline.GetItemListInTrack("video", i) | |
if items is None: | |
continue | |
for potential_proxy in potential_proxies: | |
proxy_name = os.path.splitext(os.path.basename(potential_proxy))[0] | |
for item in items: | |
for ext in acceptable_exts: | |
if ext.lower() in item.GetName().lower(): | |
try: | |
media = item.GetMediaPoolItem() | |
name = media.GetName() | |
path = media.GetClipProperty("File Path") | |
except: | |
print(f"Skipping {name}, no linked media pool item.") | |
continue | |
clip_name = os.path.splitext(os.path.basename(path))[0] | |
if proxy_name.lower() in clip_name.lower(): | |
if name not in linked: | |
linked.append(name) | |
print(f"Found match: {proxy_name} & {clip_name}") | |
media.LinkProxyMedia(potential_proxy) | |
if __name__ == "__main__": | |
try: | |
# Get global variables | |
resolve = GetResolve() | |
global project | |
project = resolve.GetProjectManager().GetCurrentProject() | |
global timeline | |
timeline = project.GetCurrentTimeline() | |
global media_pool | |
media_pool = project.GetMediaPool() | |
proxy_dir = get_proxy_path() | |
print(f"Passed directory: '{proxy_dir}'\n") | |
potential_proxies = filter_videos(proxy_dir) | |
print(potential_proxies) | |
match_proxies(potential_proxies) | |
except Exception as e: | |
tb = traceback.format_exc() | |
print(tb) | |
tkinter.messagebox.showinfo("ERROR", tb) | |
print("ERROR - " + str(e)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Save proxy clip list | |
# Currently no way exists to create proxies of clips used in active timeline. | |
# Can't right click or create smart bin to filter by usage and timeline. | |
# This creates a text file of clips from active timeline that encode_resolve_proxies can read. | |
# May change it to csv/json/yml someday. | |
# Don't forget to update clip_list_path | |
import os, sys | |
import traceback | |
import tkinter | |
import tkinter.messagebox | |
from tkinter import filedialog | |
from python_get_resolve import GetResolve | |
from datetime import datetime | |
clip_list_path = "S:\\ProxyMedia\\ClipLists" | |
def save_clip_list(clips): | |
# initial_dir = os.path.join(clip_list_path, project.GetName()) | |
os.makedirs(clip_list_path, exist_ok=True) | |
root = tkinter.Tk() | |
root.withdraw() | |
f = filedialog.asksaveasfile(initialdir = clip_list_path, initialfile = f"{project.GetName()}_{timeline.GetName()}_cliplist.txt", title="Generate clip list", | |
filetypes = (("txt file", "*.txt"), ("all files", "*.*"))) | |
if f is None: | |
exit(0) | |
# write project name as header | |
# f.write(f"{str(project.GetName())}\n") | |
# write clip list | |
for clip in clips: | |
f.write(f"{str(clip)}\n") | |
f.close() | |
def get_media_paths(): | |
acceptable_exts = [".mov",".mp4",".mxf",".avi"] | |
media_paths = [] | |
track_len = timeline.GetTrackCount("video") | |
print(f"Video track count: {track_len}") | |
for i in range(track_len): | |
items = timeline.GetItemListInTrack("video", i) | |
if items is None: | |
continue | |
for item in items: | |
for ext in acceptable_exts: | |
if ext.lower() in item.GetName().lower(): | |
try: | |
media = item.GetMediaPoolItem() | |
path = media.GetClipProperty("File Path") | |
except: | |
print(f"Skipping {item.GetName()}, no linked media pool item.") | |
continue | |
if path not in media_paths: | |
media_paths.append(path) | |
media_paths = list(dict.fromkeys(media_paths)) | |
# else: | |
# print(f"Skipping {item.GetName()}, not of type {ext}") | |
return media_paths | |
if __name__ == "__main__": | |
try: | |
# Get global variables | |
resolve = GetResolve() | |
global project | |
project = resolve.GetProjectManager().GetCurrentProject() | |
global timeline | |
timeline = project.GetCurrentTimeline() | |
clips = get_media_paths() | |
save_clip_list(clips) | |
except Exception as e: | |
tb = traceback.format_exc() | |
print(tb) | |
root = tkinter.Tk() | |
root.withdraw() | |
tkinter.messagebox.showinfo("ERROR", tb) | |
print("ERROR - " + str(e)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hey there! I've since fleshed this out to a full github repository. The requirements are a little different though. I've used Celery and a server running a RabbitMQ broker to distribute jobs instead of the more naïve file-based approach above.
https://github.com/in03/Resolve-Proxy-Encoder