Created
February 1, 2024 19:20
-
-
Save baardev/3a430a46604d01b2b0ba1bf2b2afbb7b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/env python | |
from urllib import request | |
import os, sys, getopt | |
from glob import glob | |
import tempfile as tmp | |
import shutil | |
from pathlib import Path | |
import more_itertools | |
from colorama import init, Fore | |
import subprocess | |
import websocket #! NOTE: websocket-client (https://github.com/websocket-client/websocket-client) | |
import uuid | |
import json | |
import urllib.request | |
import urllib.parse | |
from pprint import pprint | |
import calendar | |
import time | |
init() | |
server_address = "127.0.0.1:8188" | |
client_id = str(uuid.uuid4()) | |
def split_path(pstr): | |
dirname = os.path.dirname(pstr) | |
if dirname == "" or dirname == ".": | |
dirname = os.getcwd() | |
basename = os.path.basename(pstr) | |
ns = basename.split(".") | |
ext = ns[-1] | |
nameonly = "".join(ns[:-1]) | |
fullpath = f"{dirname}/{basename}" | |
return { | |
"dirname": dirname, | |
"basename": basename, | |
"ext": ext, | |
"nameonly": nameonly, | |
"fullpath": fullpath, | |
} | |
def tryit(kwargs, arg, default): | |
try: | |
rs = kwargs[arg] | |
except: | |
rs = default | |
return rs | |
def prunlive(cmd, **kwargs): | |
# print("+++++++++++++",cmd) | |
debug = tryit(kwargs, "debug", False) | |
dryrun = tryit(kwargs, "dryrun", False) | |
# cmd = str(cmd).replace("~","\xC2\xA0") | |
if dryrun == "print": | |
print(Fore.YELLOW + cmd + Fore.RESET) | |
return | |
# cmd = cmd.replace("~", "X") | |
# cmd = cmd.replace("~", "\u00A0") | |
scmd = cmd.split() | |
# print("===========", scmd) | |
for i in range(len(scmd)): | |
scmd[i] = scmd[i].replace("~", " ") | |
scmd[i] = scmd[i].replace('"', "") | |
if debug: | |
print(Fore.YELLOW + cmd + Fore.RESET) | |
# pprint(scmd) | |
process = subprocess.Popen(scmd, stdout=subprocess.PIPE) | |
for line in process.stdout: | |
print(Fore.RED, end="") | |
sys.stdout.write(line.decode("utf-8")) | |
print(Fore.RESET, end="") | |
def get_timestamp(): | |
current_GMT = time.gmtime() | |
time_stamp = calendar.timegm(current_GMT) | |
return time_stamp | |
#! https://github.com/comfyanonymous/ComfyUI/blob/master/script_examples/websockets_api_example.py | |
def get_history(prompt_id): | |
with urllib.request.urlopen("http://{}/history/{}".format(server_address, prompt_id)) as response: | |
return json.loads(response.read()) | |
def get_queue_id(): | |
prompt_id = queue_prompt(prompt)['prompt_id'] | |
def queue_prompt(prompt): | |
p = {"prompt": prompt, "client_id": client_id} | |
data = json.dumps(p).encode('utf-8') | |
req = urllib.request.Request("http://{}/prompt".format(server_address), data=data) | |
return json.loads(urllib.request.urlopen(req).read()) | |
#! https://github.com/comfyanonymous/ComfyUI/blob/master/script_examples/basic_api_example.py | |
def showhelp(): | |
print("help") | |
rs = """ | |
-h, --help show help | |
-v, --video | |
-d, --debug | |
-s, --stage 'prep'|'anim'|'merge' | |
-x, --experimental 'fswap` | |
""" | |
exit() | |
# v ──────────────────────────────────────────────────────────────────────────────────────────────────────────── | |
# [ get args | |
argv = sys.argv[1:] | |
try: | |
opts, args = getopt.getopt( | |
argv, | |
"hv:ds:a:x:", | |
[ 'help', | |
'video=', | |
'debug', | |
'stage=', | |
'atmpl=', | |
'experimental=', | |
], | |
) | |
except Exception as e: | |
print(str(e)) | |
mtype = False | |
# pfile = False # "/home/jw/src/sdc/settings/COMFY/monsters_API.json" | |
stage = False | |
debug = False | |
video = "/home/jw/Videos/0_ORIGINAL.mp4" | |
anim_template_name="anim_pre_workflow_02_API.json" | |
experimental = False | |
for opt, arg in opts: | |
if opt in ("-h", "--help"):showhelp() | |
if opt in ("-v", "--video"):video = arg | |
if opt in ("-d", "--debug"):debug = True | |
if opt in ("-s", "--stage"):stage = arg | |
if opt in ("-a", "--atmpl"):anim_template_name = arg | |
if opt in ("-x", "--experimental"):experimental = arg | |
if stage == False: | |
print("-s, --stage missing") | |
showhelp() | |
groupsize = 26 # anim | |
interpx = 6 # merge | |
# groupsize = 16 # anim | |
# interpx = 4 # merge | |
extract_count = 0 # 0 = all | |
overlap = 0 # deprecated | |
fps = 8 | |
os.environ["TMPDIR"] = "/fstmp" | |
settings_dir = "/home/jw/src/sdc/settings/COMFY" | |
output_dir = "/home/jw/src/ComfyUI/output" | |
prep_template_name="p_ZZ_API.json" | |
mp4_output_wc = "p_??_*.mp4" | |
def maketmpname(str,**kwargs): | |
""" | |
Create a temp filename that is prefixed with a string. | |
Optionally, create a directory | |
""" | |
create = tryit(kwargs,'create',False) | |
paths = split_path(tmp.mktemp()) | |
newname = f"{paths['dirname']}/{str}_{paths['basename']}" | |
if create == True: | |
os.mkdir(newname) | |
return(newname) | |
def get_sorted_files(spec): | |
""" | |
return a sorted list of filenames | |
""" | |
files = glob(spec) | |
files = sorted(files) | |
return files | |
def get_video_frames(video, **kwargs): | |
""" | |
extract the frames of a video and return a sorted list of filenames | |
""" | |
count = tryit(kwargs,'count',0) | |
debug = tryit(kwargs,'debug',False) | |
tmpdir = maketmpname("EXT",create=True) | |
if debug: print(f"made dir: {tmpdir}") | |
cmd = f"ffmpeg -y -loglevel warning -i {video} -r {fps}/1 {tmpdir}/%04d.png" | |
prunlive(cmd, debug = debug) | |
files = get_sorted_files(f"{tmpdir}/*png") | |
if count == 0: | |
return files | |
else: | |
return files[:count] | |
def cleandir(dir): | |
""" | |
delete all files in a dir | |
""" | |
files = get_sorted_files(dir) | |
print(files) | |
for f in files: | |
if os.path.isfile(f): | |
os.unlink(f) | |
if os.path.isdir(f): | |
shutil.rmtree(f) | |
def createInterps(last,first,**kwargs): | |
""" | |
Create n interpoalted frames based on two existing frames. | |
Return a list of newly created frames | |
If debug==True, adds a red dot to teh interpolated frames | |
""" | |
interpx = tryit(kwargs,'interpx',8) | |
debug = tryit(kwargs,'debug',False) | |
#! make target dirs | |
indir = maketmpname("IN",create=True) | |
if debug: print(f"made indir: {indir}") | |
outdir = maketmpname("OUT",create=True) | |
if debug: print(f"made outdir: {outdir}") | |
#! copy first and last images to targets | |
if debug: print(f"copying [{last}] => [{indir}/1.png]") | |
shutil.copy(last,indir+"/1.png") | |
if debug: print(f"copying [{first}] => [{indir}/2.png]") | |
shutil.copy(first,indir+"/2.png") | |
#! call interp script (runs in conda env 'rife') | |
cmd = f"/home/jw/src/rife/simple_interp.sh {indir} {outdir} {interpx}" | |
prunlive(cmd,debug=debug) | |
files = get_sorted_files(f"{outdir}/*.png") | |
if debug == True: | |
#! for debugging, mark interp images with icon | |
from PIL import Image, ImageDraw, ImageFilter | |
im2 = Image.open('/home/jw/share/dot512.png') | |
for file in files: | |
im1 = Image.open(file) | |
back_im1 = im1.copy() | |
back_im1.paste(im2, (10,10)) | |
back_im1.save(file, quality=95) | |
return files | |
def extract_video(video): | |
""" | |
Extracts the frames from the generated video clips in subfolder specific to the video | |
""" | |
parts = split_path(video) | |
exdir = maketmpname(parts["nameonly"],create=True) | |
cmd = f"ffmpeg -y -loglevel panic -i {video} -r {fps}/1 {exdir}/%04d.png" | |
prunlive(cmd, debuf=True) | |
files = get_sorted_files(f"{exdir}/*.png") | |
return files | |
def wait_until_finished(prompt_id): | |
history = {} | |
while(len(history)==0): | |
history = get_history(prompt_id) | |
print(".", end="", flush=True) | |
time.sleep(5) | |
print("\n") | |
return True | |
def save_prompt(prompt,str,n): | |
data = json.dumps(prompt, indent=4) | |
with open(f"/home/jw/src/ComfyUI/output/__{str}_{n}.json", "w") as f: | |
f.write(data) | |
def update_template(prompt_text, i): | |
prompt_text = prompt_text.replace("HED_00", f"HED_{i:02d}") | |
prompt_text = prompt_text.replace("POSE_00", f"POSE_{i:02d}") | |
prompt_text = prompt_text.replace("SEG_00", f"SEG_{i:02d}") | |
return json.loads(prompt_text) | |
if __name__ == "__main__": | |
timestamp = get_timestamp() | |
cleandir(f"{os.environ['TMPDIR']}/*") | |
files = get_video_frames(video, count=extract_count) | |
tdirs = [] | |
#! now split into groups | |
fgroups = list(more_itertools.chunked(files, groupsize)) | |
print(f"Created [{len(fgroups)}] groups of [{groupsize}] from [{len(files)}] frames") | |
if stage == "anim": cleandir(f"{output_dir}/p_*") | |
for i in range(0,len(fgroups)-1): | |
print(Fore.MAGENTA + f"██████████████████████████████████████████████ {i}/{len(fgroups)-1} " + Fore.RESET) | |
if stage == "prep": | |
print(Fore.YELLOW + f"██████████████████████████████████████████████ PREPPING " + Fore.RESET) | |
#! load and run prep template prompt, This creates the ControNet input images in 'output_dir' | |
prompt_text = Path(f"{settings_dir}/{prep_template_name}").read_text() | |
prompt = update_template(prompt_text,i) | |
prompt_id = queue_prompt(prompt)['prompt_id'] | |
#! manually move output dirs to input | |
#! manually move output dirs to input | |
if stage == "anim": | |
print(Fore.YELLOW + f"██████████████████████████████████████████████ ANIMATING " + Fore.RESET) | |
#! create folder by group; 00, 01, 02, ... | |
tmpdir = f"{os.environ['TMPDIR']}/{i:02d}" | |
print(f"Target: {tmpdir}") | |
os.mkdir(tmpdir) | |
tdirs.append(tmpdir) | |
#! copy and renname/renumber subset of files to folder | |
for j in range(0,len(fgroups[i])): | |
shutil.copy(fgroups[i][j], f"{tmpdir}/{(j+1):03d}.png") | |
#! submit the prompt | |
prompt_text = Path(f"{settings_dir}/{anim_template_name}").read_text() #! load template | |
prompt = update_template(prompt_text,i) | |
#! adjust accordingly | |
prompt['9']['inputs']['batch_size'] = groupsize | |
prompt['55']['inputs']['image_load_cap'] = groupsize | |
prompt['119']['inputs']['frame_rate'] = fps | |
prompt['133']['inputs']['image_load_cap'] = groupsize | |
prompt['137']['inputs']['image_load_cap'] = groupsize | |
#! this is where we assign the new seed image to the last image of the previous group | |
if experimental == "fswap": | |
if i > 0: | |
newseed = f"{tdirs[i-1]}/{groupsize:03d}.png" | |
prompt['30']['inputs']['image']=newseed | |
if debug: | |
print(f"Setting seed image to [{newseed}]") | |
save_prompt(prompt,stage,i) | |
prompt_id = queue_prompt(prompt)['prompt_id'] | |
#! WAIT FOR THE QUEUE TO COMLETE | |
wait_until_finished(prompt_id) | |
if stage == "merge": | |
print(Fore.YELLOW+f"██████████████████████████████████████████████ MERGING "+Fore.RESET) | |
allfiles = [] # stub for storage of all files | |
exdirimgs = [] # stub for storage of all clip files | |
files = get_sorted_files(f"{output_dir}/{mp4_output_wc}") | |
# pprint(files) | |
# exit() | |
for file in files: | |
exdirimgs.append(extract_video(file)) | |
# exit() | |
# pprint(exdirimgs) | |
# exit() | |
for i in range(len(exdirimgs)-1): | |
allfiles.append(exdirimgs[i]) | |
last = exdirimgs[i][-1] | |
first = exdirimgs[i+1][0] | |
print(f"Interpolating: {last} <=> {first} {interpx}x") | |
newfiles_ary = createInterps(last,first, interpx=interpx,debug=debug) | |
#! add the new files to the end of ary | |
allfiles.append(newfiles_ary) | |
#! now rename in sequence | |
tmpdir = maketmpname("FINAL",create=True) | |
k = 0 | |
for group in allfiles: | |
for filename in group: | |
shutil.move(filename,f"{tmpdir}/{k:03d}.png") | |
k += 1 | |
cmd = f"ffmpeg -y -loglevel warning -hide_banner -hwaccel auto -y -framerate {fps} -pattern_type glob -i {tmpdir}/*.png -r {fps} -vcodec libx264 -preset medium -crf 23 -vf minterpolate=mi_mode=blend,fifo -pix_fmt yuv420p -movflags +faststart {output_dir}/final.mp4" | |
prunlive(cmd,debug=debug) | |
print("FINAL VIDEO: "+Fore.CYAN+f"{output_dir}/final.mp4"+Fore.RESET) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment