Skip to content

Instantly share code, notes, and snippets.

@ferreirafabio
Created September 11, 2018 10:14
Show Gist options
  • Save ferreirafabio/b0251579156b1159a33b02b9d9dea9cf to your computer and use it in GitHub Desktop.
Save ferreirafabio/b0251579156b1159a33b02b9d9dea9cf to your computer and use it in GitHub Desktop.
Augment videos by relative crop displacements in parallel
% Contributors: Jonas Rothfuss and Fabio Ferreira
import math
import os
import moviepy
import subprocess
import random
import glob
import multiprocessing
from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip
from moviepy.editor import *
from joblib import Parallel, delayed
NUM_TIME_CROPS = 1
NUM_CORES = multiprocessing.cpu_count()
def generate_video_name(source_video_name, target_format, relative_crop_displacement, time_interval):
return source_video_name + '_' + str(target_format[0]) + 'x' + str(target_format[1]) + '_' \
+ str("%.2f" % relative_crop_displacement) + '_' + "(%.1f,%.1f)" % time_interval\
+ '.avi'
def kill_process(process):
try:
if hasattr(process, 'reader'):
process.reader.close()
if hasattr(process, 'audio'):
process.audio.reader.close_proc()
process.__del__()
except:
pass
def files_from_directory(dir_str, file_type):
file_paths = glob.glob(os.path.join(dir_str, file_type))
return [os.path.basename(i) for i in file_paths]
def extract_subvideo(video_path, target_time_interval=(1, 4)):
""" Returns an instance of VideoFileClip of the initial video with the content between times start and end_time.
In the case end_time exceeds the true end_time of the clip the entire clip starting from start_time is returned.
Should the video's duration be smaller than the given start_time, the original video is returned immediately without
trimming. Also, should the specified subvideo length (end_time - start_time) exceed the video duration, the original
video is returned.
:param video_path: specifies the full (absolute) path to the video file
:param target_time_interval(x,y): x: start time in s (e.g. 6.5) y: end time in s (e.g. 6.5)
:return: the trimmed sub video (VideoFileClip)
"""
start_time = target_time_interval[0]
end_time = target_time_interval[1]
assert os.path.isfile(video_path), "video_path does not contain a file"
assert start_time < end_time, "invalid target time interval - start_time must be smaller than end_time"
clip = VideoFileClip(video_path)
assert end_time < clip.duration, "video to short to crop (duration=%.3f, end_time=%.3f)" % (clip.duration, end_time)
sub_clip = clip.subclip(start_time, end_time)
assert abs(sub_clip.duration - end_time + start_time) < 0.001 # ensure that sub_clip has desired length
# returning both for killing since ffmpeg implementation produces zombie processes
return sub_clip
def crop_and_resize_video_clip(video_path=None, video_file_clip=None, target_format=(128, 128), relative_crop_displacement=0.0):
"""
:param video_path: specifies the full (absolute) path to the video file
:param video_file_clip: provide a moviepy.VideoFileClip if this should be used instead of the video specified by video_path
:param target_format: a tuple (width, height) specifying the dimensions of the returned video
:param relative_crop_displacement: augmentation parameter, adjusts the clipping in either
y (when width < height) or x (when width >= height) dimension
:return: returns the cropped and resized VideoFileClip instance
"""
assert video_path or video_file_clip
assert (-1 <= relative_crop_displacement <= 1), "relative_crop_displacement must be in interval [0,1]"
if video_file_clip:
clip = video_file_clip
else:
assert os.path.isfile(video_path), "video_path must be a file"
clip = VideoFileClip(video_path)
width, height = clip.size
if width >= height:
x1 = math.floor((width - height) / 2) + relative_crop_displacement * math.floor((width - height) / 2)
y1 = None
size = height
else:
x1 = None
y1 = math.floor((height - width) / 2) + relative_crop_displacement * math.floor((height - width) / 2)
size = width
clip_crop = moviepy.video.fx.all.crop(clip, x1=x1, y1=y1, width=size, height=size)
return moviepy.video.fx.all.resize(clip_crop, newsize=target_format)
def generate_video_snippets(i, clip, file_names, output_dir, target_format, num_clips):
try:
video_path, duration = clip['path'], clip['duration']
video_name = os.path.basename(video_path).replace('.mp4', '').replace('.avi', '')
if any(str(video_name) in x for x in file_names):
print("Skipping video (already_exists): " + video_name)
else:
interval_suggestions = video_time_interval_suggestions(duration, max_num_suggestions=NUM_TIME_CROPS)
if len(interval_suggestions) == 1:
num_random_crops = 4
elif len(interval_suggestions) == 2:
num_random_crops = 1
else:
num_random_crops = 1
for time_interval in interval_suggestions:
for _ in range(num_random_crops):
sample_rel_crop_displacement = random.uniform(-0.7, 0.7)
try:
prepare_and_store_video(video_path, output_dir, target_time_interval=time_interval,
relative_crop_displacement=sample_rel_crop_displacement,
target_format=target_format)
except Exception as e:
print('Failed to process video (' + str(video_path) + ') ---' + str(e))
finally:
subprocess.call(["pkill -9 -f " + video_path], shell=True)
print('[%d of %d] ' % (i, num_clips) + 'Successfully processed video (' + str(video_path) + ')')
except Exception as e:
print('[%d of %d] ' % (i, num_clips) + 'Failed to process video (' + str(video_path) + ') ---' + str(e))
def video_time_interval_suggestions(video_duration, max_num_suggestions=4):
"""
:param video_length: duration of video in seconds
:return: array of tuples representing time intervals [(t1_start, t1_end), (t2_start, t2_end), ...]
"""
assert (video_duration > 3), "video too short to crop (duration < 3 sec)"
suggestions = []
if video_duration < 4:
margin = (4-video_duration)/2
suggestions.append((margin, 4-margin))
elif video_duration < 5:
suggestions.append((1, 4))
else:
num_suggestuions = min(max_num_suggestions, int((video_duration-2)//2.5))
left_margin, right_margin= 1, video_duration - 1
for i in range(num_suggestuions):
offset = (video_duration-2)/num_suggestuions * i
suggestions.append((left_margin + offset, left_margin + offset + 3))
assert len(suggestions) > 0
assert all([t_end <= video_duration and abs(t_end - t_start - 3) < 0.001 for t_start, t_end in suggestions])
return suggestions
def prepare_and_store_video(source_video_path, output_dir, target_time_interval, target_format=(128,128), relative_crop_displacement=0.0):
sub_clip = None
source_video_name = os.path.basename(source_video_path).replace('.mp4', '').replace('.avi', '')
assert isinstance(target_time_interval, tuple), "provided target_time_interval is not a tuple"
# do time trimming, else leave video at original length
sub_clip = extract_subvideo(source_video_path, target_time_interval=(target_time_interval[0], target_time_interval[1]))
assert sub_clip is not None
if sub_clip is not None and sub_clip.duration < (target_time_interval[1] - target_time_interval[0]):
# skip video if it is shorter than specified
print('Video too short, skipping.')
kill_process(sub_clip)
subprocess.call(["pkill -9 -f " + source_video_path], shell=True)
return None
target_video_name = generate_video_name(source_video_name, target_format, relative_crop_displacement, target_time_interval)
target_video_path = os.path.join(output_dir, target_video_name)
clip_resized = crop_and_resize_video_clip(source_video_path, video_file_clip=sub_clip, target_format=target_format,
relative_crop_displacement=relative_crop_displacement)
print("writing video: " + target_video_path)
clip_resized.write_videofile(target_video_path, codec='rawvideo', progress_bar=False, verbose=False)
kill_process(clip_resized)
if __name__ == '__main__':
file_dir = "/somedir"
target_format = (128, 128)
#get files that were already processed
file_names = files_from_directory(file_dir, '*.avi')
num_clips = len(file_names)
Parallel(n_jobs=NUM_CORES)(
delayed(generate_video_snippets)(i, clip, file_names, file_dir, target_format, num_clips)
for i, clip in enumerate(file_names))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment