Skip to content

Instantly share code, notes, and snippets.

@dasl-
Created May 28, 2019 04:40
Show Gist options
  • Save dasl-/1b166916ee0268bbd37c9bed132197d4 to your computer and use it in GitHub Desktop.
Save dasl-/1b166916ee0268bbd37c9bed132197d4 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
import numpy as np
import cv2
import pafy
import pprint
import random
import time
import argparse
import math
import os
import time
import keyboard
import sys
from multiprocessing import Process, Pool
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double
import sharedmem
#######################################################################################
# ex 1
#######################################################################################
# class Point(Structure):
# _fields_ = [('x', c_double), ('y', c_double)]
# def modify(n, x, s, A):
# print("yo")
# n.value **= 2
# x.value **= 2
# s.value = s.value.upper()
# for a in A:
# a.x **= 2
# a.y **= 2
# return 1
# def m(A, i):
# time.sleep(random.randint(0,5))
# print(i)
# A[i] = (9.0,8.0)
# if __name__ == '__main__':
# n = Value('i', 7)
# x = Value(c_double, 1.0/3.0, lock=False)
# s = Array('c', b'hello world', lock=False)
# A = Array(Point, [(1.0,-6.5), (-5.5,2.0), (2.3,9.5)], lock=False)
# # p = Process(target=modify, args=(n, x, s, A))
# # print(n.value)
# # print(x.value)
# # print(s.value)
# # print([(a.x, a.y) for a in A])
# p1 = Process(target=m, args=(A, 1))
# p2 = Process(target=m, args=(A, 2))
# p1.start()
# p2.start()
# print([(a.x, a.y) for a in A])
# p1.join()
# print([(a.x, a.y) for a in A])
# print("ok: " + str(p2.join()))
# print([(a.x, a.y) for a in A])
#######################################################################################
# ex 2
#######################################################################################
# def get_next_frame(vid_cap, num_skip_frames):
# success = True
# for x in range(num_skip_frames + 1): # add one because we need to call .grab() once even if we're skipping no frames.
# success = vid_cap.grab()
# if not success:
# return False, None
# success, frame = vid_cap.retrieve()
# if not success:
# return False, None
# return success, frame
# def vid(start_frame):
# vid_cap = cv2.VideoCapture("/home/pi/lightness/lightness_data/The Smashing Pumpkins - [email protected]")
# print(start_frame)
# display_width = 28
# display_height = 18
# avg_color_frames = []
# fps = vid_cap.get(cv2.CAP_PROP_FPS)
# start = time.time()
# success, frame = get_next_frame(vid_cap, start_frame)
# if not success:
# return avg_color_frames
# while (True):
# frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
# frame_width = vid_cap.get(cv2.CAP_PROP_FRAME_WIDTH)
# frame_height = vid_cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
# slice_height = (frame_height / display_height)
# slice_width = (frame_width / display_width)
# avg_color_frame = np.zeros((display_width, display_height), np.uint8)
# for x in range(display_width):
# for y in range(display_height):
# mask = np.zeros(frame.shape[:2], np.uint8)
# start_y = int(round(y * slice_height, 0))
# end_y = int(round((y + 1) * slice_height, 0))
# start_x = int(round(x * slice_width, 0))
# end_x = int(round((x + 1) * slice_width, 0))
# mask[start_y:end_y, start_x:end_x] = 1
# avg_color_frame[x, y] = cv2.mean(frame, mask)[0]
# print(vid_cap.get(cv2.CAP_PROP_POS_MSEC))
# print(frame.shape)
# print(vid_cap.get(cv2.CAP_PROP_FPS))
# #pprint.pprint(avg_color_frame)
# avg_color_frames.append(avg_color_frame)
# success, frame = get_next_frame(vid_cap, 3)
# if not success:
# break
# end = time.time()
# print("processing video took: " + str(end - start) + " seconds")
# vid_cap.release()
# return avg_color_frames
# if __name__ == '__main__':
# with Pool(processes = 4) as p:
# all_avg_color_frames = p.map(vid, [0, 1, 2, 3])
# np.save(sys.path[0] + "/lightness_data/parallel_today_all", all_avg_color_frames)
#######################################################################################
# ex 3
#######################################################################################
def get_next_frame(vid_cap, num_skip_frames):
success = True
for x in range(num_skip_frames + 1): # add one because we need to call .grab() once even if we're skipping no frames.
success = vid_cap.grab()
if not success:
return False, None
success, frame = vid_cap.retrieve()
if not success:
return False, None
return success, frame
def vid(start_frame, num_processes):
# start_frame, num_processes = args
print(str(start_frame) + " " + str(num_processes))
vid_cap = cv2.VideoCapture("/home/pi/lightness/lightness_data/The Smashing Pumpkins - [email protected]")
num_frames, display_width, display_height = avg_color_frames.shape
# for f in range(start_frame, num_frames, 4):
# for x in range(0, display_width):
# for y in range(0, display_height):
# avg_color_frames[f,x,y] = start_frame
# return
fps = vid_cap.get(cv2.CAP_PROP_FPS)
frame_width = vid_cap.get(cv2.CAP_PROP_FRAME_WIDTH)
frame_height = vid_cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
slice_height = (frame_height / display_height)
slice_width = (frame_width / display_width)
current_frame = start_frame
start = time.time()
success, frame = get_next_frame(vid_cap, start_frame)
if not success:
return
while (True):
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
avg_color_frame = np.empty((display_width, display_height), np.uint8)
for x in range(display_width):
for y in range(display_height):
mask = np.zeros(frame.shape[:2], np.uint8)
start_y = int(round(y * slice_height, 0))
end_y = int(round((y + 1) * slice_height, 0))
start_x = int(round(x * slice_width, 0))
end_x = int(round((x + 1) * slice_width, 0))
mask[start_y:end_y, start_x:end_x] = 1
avg_color_frame[x, y] = cv2.mean(frame, mask)[0]
if (current_frame % 100 == 0):
print(vid_cap.get(cv2.CAP_PROP_POS_MSEC))
# print(frame.shape)
# print(vid_cap.get(cv2.CAP_PROP_FPS))
#pprint.pprint(avg_color_frame)
avg_color_frames[current_frame] = avg_color_frame
success, frame = get_next_frame(vid_cap, num_processes - 1)
if not success:
break
current_frame += num_processes
end = time.time()
print("processing video took: " + str(end - start) + " seconds")
vid_cap.release()
vid_cap = cv2.VideoCapture("/home/pi/lightness/lightness_data/The Smashing Pumpkins - [email protected]")
num_frames = int(vid_cap.get(cv2.CAP_PROP_FRAME_COUNT))
# num_frames = 10
print(str(num_frames))
display_width = 28
display_height = 18
avg_color_frames = sharedmem.empty((num_frames, display_width, display_height), dtype=np.uint8)
num_processes = 2
args = []
processes = []
for i in range(num_processes):
args.append((i, num_processes))
processes.append(sharedmem.background(function = vid, start_frame = i, num_processes = num_processes))
print("yo")
for i in range(num_processes):
processes[i].wait()
# with sharedmem.MapReduce(np = num_processes) as pool:
# r = pool.map(func=vid, sequence=args)
# print("omfg")
print(str(avg_color_frames))
fps = vid_cap.get(cv2.CAP_PROP_FPS)
vid_cap.release()
np.save(sys.path[0] + "/lightness_data/parallel_today", avg_color_frames)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment