Last active
May 31, 2022 14:17
-
-
Save CSutter5/8aee0c03c1b1977b07168cb659ce19cb to your computer and use it in GitHub Desktop.
r/Place timelapse creator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
import queue | |
import time | |
import os | |
import cv2 | |
import numpy as np | |
import argparse | |
from tqdm import tqdm | |
# ALlow the user to add command line arguments | |
# Example: python fast\ timelapse.py -x 20 -y 679 --width 52 --height 80 --scale 8 --fps 45 -t 3 -o tux.mp4 | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-o", "--output", help="Output file name", dest="out", default="output.mp4") | |
parser.add_argument("-f", "--fps", help="Frames per second", dest="fps", default=30, type=int) | |
parser.add_argument("-x", help="X position", default=0, type=int) | |
parser.add_argument("-y", help="Y position", default=0, type=int) | |
parser.add_argument("--width", help="Width", default=2000, type=int) | |
parser.add_argument("--height", help="Height", default=2000, type=int) | |
parser.add_argument("--scale", help="Scale", default=2, type=int) | |
parser.add_argument("-t", "--threads", help="Number of threads", dest="threads", default=1, type=int) | |
done = False | |
# Create Queues for frame processing, and adding | |
todo = queue.PriorityQueue() | |
frames = queue.PriorityQueue() | |
# Create Process bars for to process frames, and compete frames | |
todoDone = 0 | |
todoBar = tqdm(postfix=todoDone, desc="Frames Processed") | |
framesDone = 0 | |
framesBar = tqdm(postfix=framesDone, desc="Frames Finished") | |
# Upscale an image | |
def upscale(img, factor): | |
if factor == 1: | |
return img | |
# Create new image of x factor | |
scaledImg = np.zeros((img.shape[0] * factor, img.shape[1] * factor, 3), dtype=np.uint8) | |
# Loop through each pixel in the image | |
for i in range(factor): | |
for j in range(factor): | |
# Copy the image into the new image | |
scaledImg[i::factor, j::factor] = img | |
del img | |
return scaledImg | |
# Make a Function to process frames | |
def frameProcessor(x, y, size, scale): | |
global todo, frames, todoDone, todoBar | |
# While Queue is not empty | |
while not todo.empty(): | |
try: | |
# Get first item | |
item = todo.get() | |
# Read image, and scale it | |
img = cv2.cvtColor( | |
upscale(cv2.imread("images/"+item[1])[y:y+size[1], x:x+size[0]], scale), | |
cv2.COLOR_RGBA2RGB | |
) | |
# Add image to queue | |
frames.put((item[1], img)) | |
del item | |
del img | |
if frames.qsize() > 100: | |
time.sleep(2) | |
except Exception as e: | |
print(f"images/{item[0]}\n{e}") | |
# Iterate the Progress Bar | |
todoDone += 1 | |
todoBar.update() | |
todo.task_done() | |
# Create Function to add processed frames to a video | |
def addFrame(outFile, fps, size): | |
global done, frames, framesDone, framesBar, out | |
# Create the video writer | |
out = cv2.VideoWriter(outFile, cv2.VideoWriter_fourcc(*"mp4v"), fps, size) | |
# While not done | |
while not done: | |
try: | |
# Get the first frame | |
frame = frames.get() | |
# Take frame and convert it from RGBA to RGB | |
out.write(frame[1]) | |
del frame | |
except Exception as e: | |
print(f"images/{frame[0]}\n{e}") | |
# Iterate Progress Bar | |
framesDone += 1 | |
framesBar.update() | |
frames.task_done() | |
print("Done") | |
out.release() | |
def createTimeLapse(x, y, width, height, scale, fps, threadsCount, out): | |
global done, todoBar, framesBar | |
done = False | |
# Define the size | |
size = (width, height) | |
scaleSize = (width*scale, height*scale) | |
# Get list of images | |
files = os.listdir("images") | |
files.sort() | |
# Set max progress bar lenght to number of images | |
todoBar.total = len(files) | |
framesBar.total = len(files) | |
# Add all files to queue | |
for file in files: | |
todo.put((file.split(".")[0], file)) | |
# Create t number of threads and add to threads | |
threads = [] | |
for i in range(threadsCount): | |
threads.append(threading.Thread(target=frameProcessor, daemon=True, args=(x, y, size, scale,))) | |
threads[i].start() | |
# Start video writing thread | |
writer = threading.Thread(target=addFrame, daemon=True, args=(out, fps, scaleSize,)) | |
writer.start() | |
# Join the queue thread back to the main thread | |
todo.join() | |
# Wait untill frames is not empty | |
while not frames.empty(): | |
pass | |
done = True | |
# Join all threads back to main | |
for t in threads: | |
t.join() | |
# Join the writing thread to main | |
writer.join() | |
if __name__ == "__main__": | |
# Get the arguments from the command line | |
args = parser.parse_args() | |
# Create the TimeLapse | |
createTimeLapse(args.x, args.y, args.width, args.height, args.scale, args.fps, args.threads, args.out) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment