Created
July 8, 2017 09:20
-
-
Save waveform80/d18a598e0c8621ba98461a6986270633 to your computer and use it in GitHub Desktop.
The code used for https://www.youtube.com/watch?v=WnnKrVSMbng
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# WARNING: This probably won't work with current versions of the | |
# mmalobj layer; it was made while the layer was still being | |
# developed. | |
import io | |
import curses | |
import datetime as dt | |
from picamera import array, mmal, mmalobj as mo, PiCameraMMALError | |
from threading import Thread, Lock | |
from PIL import Image, ImageDraw, ImageFont | |
from time import sleep | |
from math import sin, cos, pi | |
from collections import namedtuple | |
class Coord(namedtuple('Coord', ('x', 'y'))): | |
@classmethod | |
def clock(cls, radians): | |
return Coord(sin(radians), -cos(radians)) | |
def __add__(self, other): | |
try: | |
return Coord(self.x + other[0], self.y + other[1]) | |
except TypeError: | |
return Coord(self.x + other, self.y + other) | |
def __sub__(self, other): | |
try: | |
return Coord(self.x - other[0], self.y - other[1]) | |
except TypeError: | |
return Coord(self.x - other, self.y - other) | |
def __mul__(self, other): | |
try: | |
return Coord(self.x * other[0], self.y * other[1]) | |
except TypeError: | |
return Coord(self.x * other, self.y * other) | |
def __truedev__(self, other): | |
try: | |
return Coord(self.x / other[0], self.y / other[1]) | |
except TypeError: | |
return Coord(self.x / other, self.y / other) | |
def __floordiv__(self, other): | |
try: | |
return Coord(self.x // other[0], self.y // other[1]) | |
except TypeError: | |
return Coord(self.x // other, self.y // other) | |
class Demo(mo.MMALPythonComponent): | |
def __init__(self): | |
super(Demo, self).__init__(outputs=2) | |
self._lock = Lock() | |
self._font = ImageFont.truetype('/usr/share/fonts/truetype/freefont/FreeSans.ttf', 24) | |
self.clock_enabled = False | |
self._clock_image = None | |
self.bar_enabled = False | |
self._bar_image = None | |
self._demo_thread = None | |
self._rec_image = None | |
self.recording = False | |
def _commit_port(self, port): | |
super(Demo, self)._commit_port(port) | |
if port.type == 'in': | |
if port.format.value != mmal.MMAL_ENCODING_I420: | |
raise PiCameraMMALError(mmal.MMAL_EINVAL, 'invalid format') | |
def _demo_run(self): | |
clock_face = Image.new('L', (100, 100)) | |
draw = ImageDraw.Draw(clock_face) | |
draw.ellipse([(0, 0), (99, 99)], outline=(255,)) | |
bar_outline = Image.new('L', (300, 30)) | |
draw = ImageDraw.Draw(bar_outline) | |
draw.rectangle([(0, 0), (249, 29)], outline=(255,)) | |
self._rec_image = Image.new('L', (100, 25)) | |
draw = ImageDraw.Draw(self._rec_image) | |
draw.text((0, 0), 'REC', (200,), self._font) | |
while self.enabled: | |
img = clock_face.copy() | |
draw = ImageDraw.Draw(img) | |
now = dt.datetime.now() | |
midnight = now.replace(hour=0, minute=0, second=0, microsecond=0) | |
timestamp = (now - midnight).total_seconds() | |
center = Coord(49, 49) | |
hour_pos = center + Coord.clock(2 * pi * (timestamp % 43200 / 43200)) * 30 | |
min_pos = center + Coord.clock(2 * pi * (timestamp % 3600 / 3600)) * 45 | |
sec_pos = center + Coord.clock(2 * pi * (timestamp % 60 / 60)) * 45 | |
draw.line([center, hour_pos], fill=(200,), width=2) | |
draw.line([center, min_pos], fill=(200,), width=2) | |
draw.line([center, sec_pos], fill=(200,), width=1) | |
with self._lock: | |
self._clock_image = img | |
img = bar_outline.copy() | |
draw = ImageDraw.Draw(img) | |
draw.rectangle([(2, 2), (2 + (247 * now.second / 60), 27)], | |
fill=(200,), outline=(200,)) | |
draw.text((254, 2), '%d%%' % (now.second * 100 / 60), (200,), | |
self._font) | |
with self._lock: | |
self._bar_image = img | |
sleep(0.2) | |
def _callback(self, port, buf): | |
target1 = self.outputs[0].get_buffer(False) | |
if self.recording: | |
target2 = self.outputs[1].get_buffer(False) | |
else: | |
target2 = None | |
if target1: | |
target1.copy_from(buf) | |
with target1 as data: | |
img = Image.frombuffer( | |
'L', self.inputs[0].framesize, data, | |
'raw', 'L', 0, 1) | |
img.readonly = 0 | |
if self.recording: | |
img.paste(self._rec_image, (1180, 10), self._rec_image) | |
if self.clock_enabled and self._clock_image: | |
with self._lock: | |
img.paste( | |
self._clock_image, (10, 10), | |
self._clock_image) | |
if self.bar_enabled and self._bar_image: | |
with self._lock: | |
img.paste( | |
self._bar_image, (10, 680), | |
self._bar_image) | |
if target2: | |
target2.replicate(target1) | |
try: | |
self.outputs[0].send_buffer(target1) | |
except PiCameraMMALError as e: | |
if e.status != mmal.MMAL_EINVAL: | |
raise | |
return True | |
if target2: | |
try: | |
self.outputs[1].send_buffer(target2) | |
except PiCameraMMALError as e: | |
if e.status != mmal.MMAL_EINVAL: | |
raise | |
return True | |
return False | |
def enable(self): | |
super(Demo, self).enable() | |
self._demo_thread = Thread(target=self._demo_run) | |
self._demo_thread.daemon = True | |
self._demo_thread.start() | |
def disable(self): | |
super(Demo, self).disable() | |
if self._demo_thread: | |
self._demo_thread.join() | |
self._demo_thread = None | |
with self._lock: | |
self._clock_image = None | |
def main(window): | |
global camera, preview, encoder, transform | |
camera = mo.MMALCamera() | |
preview = mo.MMALRenderer() | |
encoder = mo.MMALVideoEncoder() | |
transform = Demo() | |
# Configure camera output 0 | |
camera.outputs[0].framesize = (1280, 720) | |
camera.outputs[0].framerate = 24 | |
camera.outputs[0].commit() | |
# Configure H.264 encoder | |
encoder.outputs[0].format = mmal.MMAL_ENCODING_H264 | |
encoder.outputs[0].bitrate = 1000000 | |
encoder.outputs[0].commit() | |
p = encoder.outputs[0].params[mmal.MMAL_PARAMETER_PROFILE] | |
p.profile[0].profile = mmal.MMAL_VIDEO_PROFILE_H264_HIGH | |
p.profile[0].level = mmal.MMAL_VIDEO_LEVEL_H264_41 | |
encoder.outputs[0].params[mmal.MMAL_PARAMETER_PROFILE] = p | |
encoder.outputs[0].params[mmal.MMAL_PARAMETER_VIDEO_ENCODE_INLINE_HEADER] = True | |
encoder.outputs[0].params[mmal.MMAL_PARAMETER_INTRAPERIOD] = 30 | |
encoder.outputs[0].params[mmal.MMAL_PARAMETER_VIDEO_ENCODE_INITIAL_QUANT] = 22 | |
encoder.outputs[0].params[mmal.MMAL_PARAMETER_VIDEO_ENCODE_MAX_QUANT] = 22 | |
encoder.outputs[0].params[mmal.MMAL_PARAMETER_VIDEO_ENCODE_MIN_QUANT] = 22 | |
encoder.inputs[0].params[mmal.MMAL_PARAMETER_VIDEO_IMMUTABLE_INPUT] = True | |
output = io.open('output.h264', 'wb') | |
def output_callback(port, buf): | |
output.write(buf.data) | |
return bool(buf.flags & mmal.MMAL_BUFFER_HEADER_FLAG_EOS) | |
# Connect everything up | |
transform.connect(camera.outputs[0]) | |
preview.connect(transform.outputs[0]) | |
encoder.connect(transform.outputs[1]) | |
encoder.outputs[0].enable(output_callback) | |
try: | |
annotate = camera.control.params[mmal.MMAL_PARAMETER_ANNOTATE] | |
window.nodelay(True) | |
text = '' | |
while True: | |
key = window.getch() | |
if 0x20 <= key < 0x7f: | |
text += chr(key) | |
elif key == ord('\n'): | |
text = '' | |
elif key == curses.KEY_BACKSPACE: | |
text = text[:-1] | |
elif key == curses.KEY_HOME: | |
transform.clock_enabled = not transform.clock_enabled | |
elif key == curses.KEY_PPAGE: | |
transform.bar_enabled = not transform.bar_enabled | |
elif key == curses.KEY_NPAGE: | |
transform.recording = not transform.recording | |
elif key == curses.KEY_END: | |
break | |
else: | |
continue | |
annotate.enable = 1 | |
annotate.text = text.encode('ascii') | |
camera.control.params[mmal.MMAL_PARAMETER_ANNOTATE] = annotate | |
finally: | |
preview.disconnect() | |
encoder.disconnect() | |
transform.disconnect() | |
#main(None) | |
curses.wrapper(main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment