Created
January 9, 2023 06:53
-
-
Save peterw/c9c79858da1b8a2baa53c83008aefed1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from moviepy.editor import VideoFileClip, ImageClip, CompositeVideoClip | |
import moviepy.video.fx.all as vfx | |
import os | |
import mux_python | |
from mux_python.rest import ApiException | |
import requests | |
import uuid | |
from moviepy.editor import * | |
from moviepy.video.compositing.concatenate import concatenate_videoclips | |
from time import sleep | |
import re | |
import json | |
import gizeh | |
import tempfile | |
import os | |
def lambda_handler(event, context): | |
with tempfile.TemporaryDirectory() as tmpdir: | |
VID1 = os.path.join(tmpdir, "loom.mp4") | |
VID2 = os.path.join(tmpdir, "loom1.mp4") | |
PIC1 = os.path.join(tmpdir, "image1.png") | |
PIC2 = os.path.join(tmpdir, "image2.png") | |
MASK_1_PNG = os.path.join(tmpdir, "mask1.png") | |
MASK_2_PNG = os.path.join(tmpdir, "mask2.png") | |
FINAL_VID = os.path.join(tmpdir, "final.mp4") | |
def generate_video(): | |
print('starting to process the video with moviepy') | |
# load video 1 | |
vid1 = VideoFileClip(VID1)#.subclip(0, 4) | |
vid1 = vid1.resize(0.6) # 70% of its size | |
# load video 2 | |
vid2 = VideoFileClip(VID2)#.subclip(0, 4) | |
vid2 = vid2.resize(height=vid1.h) | |
def create_circle(size, name): | |
WIDTH, HEIGHT = size# vid1.size | |
surface = gizeh.Surface(WIDTH, HEIGHT) | |
circle = gizeh.circle(r=min(WIDTH, HEIGHT)//2, fill=(2,2,2), xy=(WIDTH//2, HEIGHT//2)) | |
circle.draw(surface) | |
# save to png | |
surface.write_to_png(name) | |
''' CREATE MASKS AND ADD IT TO VIDEOS ''' | |
# generate mask png with correct size | |
create_circle(vid1.size, MASK_1_PNG) | |
create_circle(vid2.size, MASK_2_PNG) | |
''' FOR FIRST VIDEO ''' | |
# load mask image | |
MASK_1 = ImageClip(MASK_1_PNG, transparent=True,ismask=True) | |
MASK_1 = MASK_1.set_duration(vid1.duration) # set duration | |
MASK_1 = MASK_1.set_fps(15) # set fps | |
# convert mask img to vido clip | |
MASK_1 = CompositeVideoClip([MASK_1], bg_color=(0), | |
size=vid1.size, ismask=True) | |
# set mask to video | |
vid1.mask = MASK_1 | |
vid1 = concatenate_videoclips([vid1], method='compose') # combine with video | |
# vid1.preview(audio=False) | |
''' FOR SECOND VIDEO ''' | |
# load mask image | |
MASK_2 = ImageClip(MASK_2_PNG, transparent=True,ismask=True) | |
MASK_2 = MASK_2.set_duration(vid2.duration) # set duration | |
MASK_2 = MASK_2.set_fps(15) # set fps | |
# convert mask img to vido clip | |
MASK_2 = CompositeVideoClip([MASK_2], bg_color=(0), | |
size=vid2.size, ismask=True) | |
# set mask to video | |
vid2.mask = MASK_2 | |
vid2 = concatenate_videoclips([vid2], method='compose') # combine with video | |
# vid2.preview(audio=False) | |
''' ADD IMAGE TO THE VIDEO ''' | |
image_clip1 = ImageClip(PIC1, duration=vid1.duration)#.resize(0.5) | |
image_clip2 = ImageClip(PIC2, duration=vid2.duration)#.resize(0.5) | |
image_clip2 = image_clip2.resize(height=image_clip1.h) | |
result1 = CompositeVideoClip([image_clip1, vid1.set_position(("right", "bottom")) ], use_bgclip=True) | |
result2 = CompositeVideoClip([image_clip2, vid2.set_position(("right", "bottom")) ], use_bgclip=True) | |
# add black area to left and right side | |
def resize_vids_if_not_same_size(): | |
# add black area to left and right side | |
m = abs(image_clip1.w - image_clip2.w) | |
if m%2 == 1: l,r = (m//2,m//2 + 1) | |
else: l,r = (m//2,m//2) | |
if image_clip1.w > image_clip2.w: | |
result2 = result2.margin(left=l, right=r) | |
else: | |
result1 = result1.margin(left=l, right=r) | |
print(result1.size, result2.size) | |
# merge both the videos (vid1 and vid2 must be of same size/resolution before joining them) | |
final = concatenate_videoclips([result1, result2]) | |
# final.preview(audio=False) | |
final.write_videofile(FINAL_VID) | |
def download_pic(url, file_name): | |
print('downloading picture ' + url) | |
response = requests.get(url) | |
if response.status_code == 200: | |
with open(file_name, "w+b") as file: | |
response = requests.get(url) | |
file.write(response.content) | |
else: | |
print('picture url was ERRORED') | |
def download_loom(videoLink, vid_name): | |
linkList = re.split("/", videoLink) | |
videoId = linkList[-1] | |
transcodedURL = "https://www.loom.com/api/campaigns/sessions/" + \ | |
videoId + "/transcoded-url" | |
postRequest = requests.post(transcodedURL) | |
deserialize = json.loads(postRequest.text) | |
videoURL = deserialize['url'] | |
print("Downloading...") | |
downloadVideo = requests.get(videoURL) | |
with open(vid_name, 'wb') as f: | |
for chunk in downloadVideo.iter_content(chunk_size=255): | |
if chunk: | |
f.write(chunk) | |
print('successfully downloaded loom video') | |
pic_url = 'https://api.apiflash.com/v1/urltoimage?access_key=02e0b0305fdc47d49c6a3bb7179f9358&url=https%3A%2F%2Ftwitter.com' | |
pic2_url = 'https://api.apiflash.com/v1/urltoimage?access_key=02e0b0305fdc47d49c6a3bb7179f9358&url=https%3A%2F%2Freddit.com' | |
download_pic(pic_url, PIC1) | |
download_pic(pic2_url, PIC2) | |
download_loom('https://www.loom.com/share/2ff13eb033584295b6389de7cdd1ae54', VID1) | |
download_loom('https://www.loom.com/share/ac0957d8a75a4f9ba0b010be1ea457ae', VID2) | |
generate_video() | |
return { | |
'statusCode': 200, | |
'body': json.dumps(str(requests.get('https://google.com'))) | |
} |
First look issue -
Lambda doesn't have a directory structure like your local system.
# Change directory to /tmp folder
os.chdir('/tmp')
_# This is important
Let's discuss this further. Here's my Upwork profile.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Yo! Found your gist from upwork, can you share what error you are getting?