Skip to content

Instantly share code, notes, and snippets.

@allyourcode
Last active July 30, 2018 00:36
Show Gist options
  • Select an option

  • Save allyourcode/c91b29a1365b24c1f76ce93860c1c8ce to your computer and use it in GitHub Desktop.

Select an option

Save allyourcode/c91b29a1365b24c1f76ce93860c1c8ce to your computer and use it in GitHub Desktop.
Tools forbreaking a video down into a series of still images.
import sys
import time
import opc
HIGH = 64
WHITE = (HIGH, HIGH, HIGH)
BLACK = (0, 0, 0)
RED = (HIGH, 0, 0)
GREEN = (0, HIGH, 0)
BLUE = (0, 0, HIGH)
def main():
pixels = []
for k in xrange(1, 34+1):
pixels.append(RED)
pixels.extend([WHITE] * (k-1))
pixels.extend([BLACK] * (256 - k))
opc_client = opc.Client('192.168.7.2:7890')
while True:
opc_client.put_pixels(pixels)
time.sleep(1.0)
if __name__ == '__main__':
main()
import sys
import time
from lightwave import display as lightwave_display
import opc
HIGH = 127
WHITE = (HIGH, HIGH, HIGH)
BLACK = (0, 0, 0)
RED = (HIGH, 0, 0)
GREEN = (0, HIGH, 0)
BLUE = (0, 0, HIGH)
def main():
display = lightwave_display.LightwaveDisplay()
# for outer_i in range(3):
# color = [RED, GREEN, BLUE][outer_i]
# for outer_j in range(12):
# panel_index = 12 * outer_i + outer_j
# print '%2d' % panel_index,
# for n in range(panel_index+1):
# if n == panel_index:
# color = WHITE
# inner_i, inner_j = divmod(n, 12)
# p = display._panels[outer_i][outer_j]
# p[inner_i, inner_j] = color
# print
# expected = 12 * 3 * 16 * 16
# assert len(pixels) == expected, (len(pixels), expected)
display[0, 48] = WHITE
display[0, 49] = WHITE
display[1, 48] = WHITE
display[1, 49] = WHITE
pixels = display.pixel_list
opc_client = opc.Client('192.168.7.2:7890')
while True:
opc_client.put_pixels(pixels)
time.sleep(1.0 / 30)
if __name__ == '__main__':
main()
>>> import numpy
>>> import display
>>> p = display.Panel((4,3))
>>> k = 0
>>> for i in xrange(4):
... for j in xrange(3):
... p[i,j] = k
... k += 1
...
>>> red = []
>>> for r, b, g in p.serpentine_order:
... assert r == g == b
... red.append(r)
...
>>> red
[2, 1, 0, 3, 4, 5, 8, 7, 6, 9, 10, 11]
>>> d = display.Display((2,2), (4, 3))
>>> h = 2 * 4
>>> w = 2 * 3
>>> k = 0
>>> for i in xrange(h):
... for j in xrange(w):
... d[i, j] = numpy.array([k,k,k], int)
... k += 1
...
>>> s = d.strings
>>> numpy.array(s.next())[:,0]
array([ 2, 1, 0, 6, 7, 8, 14, 13, 12, 18, 19, 20, 5, 4, 3, 9, 10,
11, 17, 16, 15, 21, 22, 23])
>>> numpy.array(s.next())[:,0]
array([26, 25, 24, 30, 31, 32, 38, 37, 36, 42, 43, 44, 29, 28, 27, 33, 34,
35, 41, 40, 39, 45, 46, 47])
import numpy
COLOR_DIMENSION_COUNT = 3
# TODO(danielwong): PanelInterface.
class Panel:
""".
Addressing pixels in a panel is very unnatural, because they are
positioned in horizontal serpentine order. That is, if a row ends
on the right, the next row begins on the right (similar vs. left).
The first pixel is in the top right.
"""
STANDARD_HEIGHT = 16
STANDARD_WIDTH = 16
STANDARD_SHAPE = STANDARD_HEIGHT, STANDARD_WIDTH
def __init__(self, shape=STANDARD_SHAPE):
self._shape = shape
shape = (self.height, self.width, COLOR_DIMENSION_COUNT)
self._colors = numpy.zeros(shape, int)
@property
def height(self):
return self._shape[0]
@property
def width(self):
return self._shape[1]
def __setitem__(self, inner_coordinates, new_color):
self._colors[inner_coordinates] = new_color
@property
def serpentine_order(self):
result = numpy.array(self._colors)
result[::2] = result[::2, ::-1]
new_shape = (self.width * self.height, COLOR_DIMENSION_COUNT)
return map(tuple, result.reshape(new_shape))
RightToLeftPanel = Panel
class TopToBottomRightToLeftPanel:
def __init__(self):
shape = Panel.STANDARD_SHAPE + (COLOR_DIMENSION_COUNT,)
self._colors = numpy.zeros(shape)
@property
def height(self):
return Panel.STANDARD_HEIGHT
@property
def width(self):
return Panel.STANDARD_WIDTH
def __setitem__(self, inner_coordinates, new_color):
i, j = inner_coordinates
self._colors[i, j] = new_color
@property
def serpentine_order(self):
result = numpy.rot90(self._colors)
# Reverse the odd rows.
result[1::2] = result[1::2, ::-1]
return map(tuple, result.reshape(
(self.height * self.width, COLOR_DIMENSION_COUNT)))
class VirtualPanel:
def __setitem__(self, inner_coordinates, new_color):
# This function intentionally left blank.
pass
@property
def serpentine_order(self):
return []
class Display:
""".
Addressing pixels is very unnatural, because of the pixel order in
a panel. See the Panel class for details.
"""
STANDARD_HEIGHT = 3
STANDARD_WIDTH = 12
STANDARD_SHAPE = (STANDARD_HEIGHT, STANDARD_WIDTH)
def __init__(self, shape=STANDARD_SHAPE,
panel_shape=Panel.STANDARD_SHAPE):
self._shape = shape
self._panel_shape = panel_shape
self._panels = []
for outer_i in xrange(self.height):
row = []
for outer_j in xrange(self.width):
row.append(Panel(self._panel_shape))
self._panels.append(row)
@property
def height(self):
return self._shape[0]
@property
def width(self):
return self._shape[1]
def __setitem__(self, pixel_coordinates, new_color):
i, j = pixel_coordinates
outer_i, inner_i = divmod(i, self._panel_shape[0])
outer_j, inner_j = divmod(j, self._panel_shape[1])
panel = self._panels[outer_i][outer_j]
panel[inner_i, inner_j] = new_color
@property
def strings(self):
for i in xrange(self.height):
for j in xrange(0, self.width, 2):
first = self._panels[i][j].serpentine_order
second = self._panels[i][j + 1].serpentine_order
yield first + second
RectangularDisplay = Display
class LightwaveDisplay:
def __init__(self):
row_0 = [RightToLeftPanel() for _ in xrange(10)]
row_0 = [VirtualPanel()] + row_0 + [VirtualPanel()]
for j in [3, 5, 7]:
row_0[j] = TopToBottomRightToLeftPanel()
assert len(row_0) == 12
row_1 = (
[RightToLeftPanel() for x in xrange(12)]
)
row_1[3] = TopToBottomRightToLeftPanel()
assert len(row_1) == 12
row_2 = (
[VirtualPanel()] +
[RightToLeftPanel() for x in xrange(10)] +
[VirtualPanel()]
)
assert len(row_1) == 12
self._panels = [row_0, row_1, row_2]
@property
def height(self):
return 3
@property
def width(self):
return 12
def __setitem__(self, pixel_coordinates, new_color):
i, j = pixel_coordinates
h, w = Panel.STANDARD_SHAPE # TODO(danielwong): Bad names.
outer_i, inner_i = divmod(i, h)
outer_j, inner_j = divmod(j, w)
panel = self._panels[outer_i][outer_j]
panel[inner_i, inner_j] = new_color
@property
def pixel_list(self):
result = []
def pad():
result.extend([(0, 0, 0)]* (16* 16))
row_0, row_1, row_2 = self._panels
# row_0
result.extend(row_0[1].serpentine_order)
pad()
for p in row_0[2:]:
result.extend(p.serpentine_order)
pad()
for p in row_1:
result.extend(p.serpentine_order)
result.extend(row_2[1].serpentine_order)
pad()
for p in row_2[2:]:
result.extend(p.serpentine_order)
pad() # This isn't really necessary. Just good form.
brightness = 0.5
return [(brightness * r, brightness * g, brightness * b) for r, g, b in result]
>>> import panelize
>>> import numpy
>>> a = numpy.array(range(12))
>>> a.resize(4, 3)
>>> panelize.reverse_columns_in_odd_rows(a)
>>> a
array([[ 0, 1, 2],
[ 5, 4, 3],
[ 6, 7, 8],
[11, 10, 9]])
from __future__ import print_function
import csv
import os
import sys
import cv2
def freak_out(msg):
print('Failed:', msg, file=sys.stderr)
sys.exit(1)
def divide_and_round_up(dividend, divisor):
q, r = divmod(dividend, divisor)
if r:
q += 1
return q
def reverse_columns_in_odd_rows(array):
array[1::2] = array[1::2, ::-1]
def main():
PANEL_WIDTH = 16
PANEL_HEIGHT = 16
COLOR_COMPONENT_COUNT = 3
# Inspect input.
source, = sys.argv[1:]
if not os.path.exists(source):
freak_out('File not found at %s' % source)
# Make output directory. Name it after input.
destination = os.path.splitext(source)[0]
if os.path.exists(destination):
freak_out('Destination already exists.')
os.mkdir(destination)
# Load file.
content = cv2.imread(source)
if content is None:
freak_out('Failed to load %s' % source)
row_count, column_count, _ = content.shape
# TODO(danielwong): Freak out if these are not divisible by
# PANEL_HEIGHT and PANEL_WIDTH respectively.
# Iterate over panels.
for i in xrange(0, row_count, PANEL_HEIGHT):
for j in xrange(0, column_count, PANEL_WIDTH):
slice_ = content[
i:i+PANEL_HEIGHT, j:j+PANEL_WIDTH]
w, h, _ = slice_.shape
reverse_columns_in_odd_rows(slice_)
panel_content = slice_.reshape(
w * h, COLOR_COMPONENT_COUNT)
panel_file_name = '%d_%d.csv' % (
i / PANEL_HEIGHT, j / PANEL_WIDTH)
with open(os.path.join(destination, panel_file_name),
'wb') as panel_file:
writer = csv.writer(panel_file)
for color in panel_content:
writer.writerow(map(str, color))
print('Success!', file=sys.stderr)
if __name__ == '__main__':
main()
from __future__ import division
import argparse
import random
import sys
import time
import cv2
import opc
from lightwave import display as lightwave_display
HEIGHT_PANEL_COUNT = 3
WIDTH_PANEL_COUNT = 12
def play_mp4(opc_client, video_file):
capture = cv2.VideoCapture(video_file)
ok, frame = capture.read()
if not ok:
# TODO(danielwong): Handle this.
return
display = lightwave_display.LightwaveDisplay()
# Figure out how much to scale such that minimal cropping occurs
# while at the same time, filling the whole display.
frame_height, frame_width, _ = frame.shape
scale = min(
frame_height // (HEIGHT_PANEL_COUNT * 16),
frame_width // (WIDTH_PANEL_COUNT * 16))
vertical_offset = (
frame_height - HEIGHT_PANEL_COUNT * 16 * scale) // 2
horizontal_offset = (
frame_width - WIDTH_PANEL_COUNT * 16 * scale) // 2
# Exit condition at the end of the loop body.
frame_number = 0
while True:
# Scale down.
for display_i in xrange(display.height * 16):
for display_j in xrange(display.width * 16):
frame_i = scale * display_i + vertical_offset
frame_j = scale * display_j + horizontal_offset
cell = frame[
frame_i:frame_i+scale, frame_j:frame_j+scale]
display[display_i, display_j] = cell[0,0]
opc_client.put_pixels(display.pixel_list)
ok, frame = capture.read()
frame_number += 1
if not ok:
return
# TODO(danielwong): More accurate timing.
# time.sleep(1.0 / 40.0)
def main():
PARSER = argparse.ArgumentParser(
description='Sends an mp4 to an Open Pixel Control server.')
PARSER.add_argument('--video_file', required=True)
PARSER.add_argument('--server_address',
default='192.168.7.2:7890')
# TODO(danielwong): Time out.
# TODO(danielwong): scaling_mode = (fit, fill, distort); default fill
args = PARSER.parse_args()
opc_client = opc.Client(args.server_address)
play_mp4(opc_client, args.video_file)
if __name__ == '__main__':
main()
from __future__ import print_function
import datetime
import os
import random
import string
import subprocess
import sys
import urlparse
def valid_url_or_die(url_string):
(scheme, netloc, path,
parameters, query, fragment) = urlparse.urlparse(url_string)
query = urlparse.parse_qs(query)
ok = (scheme == 'https' and
netloc == 'www.youtube.com' and
path == '/watch' and
parameters == '' and
query.keys() == ['v'] and
fragment == '')
if not ok:
print('Unsupported URL: %r' % url_string, file=sys.stderr)
sys.exit(1)
def main():
url, = sys.argv[1:]
valid_url_or_die(url)
# TODO(danielwong): Use the tempfile std lib module?
file_name = '/tmp/%s_%s.mp4' % (
datetime.datetime.now().isoformat(),
''.join(random.choice(string.ascii_letters) for _ in xrange(8)))
try:
# Download the video.
subprocess.check_call(
['youtube-dl', '-f', 'worst[ext=mp4]',
'-o', file_name,
# Prevent abuse.
'--max-filesize', '250m',
url])
# Send it to the Open Pixel Control server.
play_mp4_path = os.path.join(os.path.dirname(__file__),
'play_mp4.py')
subprocess.check_call([
'python', play_mp4_path, '--video_file', file_name,
'--height_panel_count', '2', '--width_panel_count', '4'])
finally:
os.remove(file_name)
if __name__ == '__main__':
main()
from __future__ import print_function
import csv
import os
import sys
import cv2
import numpy
# TODO(danielwong): These constants are used elsewhere
# (e.g. panelize). DRY! On the other hand, only depending on the std
# lib is ncie...
PANEL_WIDTH = 16
PANEL_HEIGHT = 16
COLOR_COMPONENT_COUNT = 3
def freak_out(msg):
print('Failed:', msg, file=sys.stderr)
sys.exit(1)
def main():
source, = sys.argv[1:]
# Make sure destination doesn't already exist.
destination = os.path.splitext(source)[0] + '.jpg'
if os.path.exists(destination):
freak_out('Destination already exists.')
out = numpy.zeros(
(PANEL_HEIGHT, PANEL_WIDTH, COLOR_COMPONENT_COUNT), int)
with open(source, 'rb') as panel_file:
reader = csv.reader(panel_file)
for k, color in enumerate(reader):
i, j = divmod(k, PANEL_WIDTH)
out[i,j] = map(int, color)
ok = cv2.imwrite(destination, out)
if __name__ == '__main__':
main()
# TODO(danielwong): Convert all .py files to py3k \m/
"""Does what the name says.
Does doctest.testfile(f) where f is a file whose name is of the form
foo.doctest
Except for .py files listed in NO_TESTS_YET, it is assumed that every
.py file has a corresponding .doctest file.
"""
from __future__ import print_function
import doctest
import os
import sys
NO_TESTS_YET = [
'run_all_tests.py',
'vid2frames.py',
'render_panel.py'
]
def main():
project_root = os.path.dirname(__file__) or os.path.curdir
for f in os.listdir(project_root):
if f in NO_TESTS_YET:
continue
if f.endswith('.py'):
doctest_file = os.path.splitext(f)[0] + '.doctest'
print('Running %r...' % doctest_file)
# DO NOT SUBMIT - Do something with the return value!
test_result = doctest.testfile(doctest_file)
if test_result.failed:
print(' FAILED %d (out of %d)'
% (test_result.failed, test_result.attempted))
elif not test_result.attempted:
print(' EMPTY')
else:
print(' PASSED all %d' % test_result.attempted)
if __name__ == '__main__':
main()
from __future__ import print_function
import os
import sys
import cv2
def freak_out(msg):
print('Failed:', msg, file=sys.stderr)
sys.exit(1)
def main():
# Inspect input.
source, = sys.argv[1:]
if not os.path.exists(source):
freak_out('Source does not exist.')
# Make output directory. Name it after input.
destination = os.path.splitext(source)[0]
if os.path.exists(destination):
freak_out('Destination already exists.')
os.mkdir(destination)
# Open input.
capture = cv2.VideoCapture(source)
frame_number = 0
# Read each frame, and write it to its own file.
while True:
ok, content = capture.read()
if not ok:
freak_out('Unable to read frame %d (0 is first frame).'
% frame_number)
ok = cv2.imwrite(os.path.join(destination,
'%d.jpg' % frame_number),
content)
if not ok:
freak_out('Unable to write frame %d (0 is first frame).'
% frame_number)
frame_number += 1
# Clean up.
cap.release()
print('Success!', file=sys.stderr)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment