Skip to content

Instantly share code, notes, and snippets.

@emmeowzing
Last active August 19, 2016 05:52
Show Gist options
  • Save emmeowzing/82407e6f8e8a97e27b2feb3b054ce972 to your computer and use it in GitHub Desktop.
Save emmeowzing/82407e6f8e8a97e27b2feb3b054ce972 to your computer and use it in GitHub Desktop.
Capure RPi camera images. This script is untested since I don't have a drone and RPi camera to test it on.
#! /usr/bin/env python2
# -*- coding: utf-8 -*-
""" Capure RPi camera images. This script is untested since I don't have a
drone and RPiCam to test it on.
"""
from glob import glob
from serial import Serial, SerialException
from time import sleep
import io
import sys
import logging
logging.basicConfig(filename='/var/log/capture.log', level=logging.INFO)
log = logging.getLogger(__name__)
# For capturing image data - defaults mplayer | ffmpeg if not found
try:
import picamera
except ImportError as I:
log.info("%s, fatal", I)
sys.exit(1)
# For knowing we're in the air and should start collecting data
import dronekit
__author__ = 'Brandon Doyle'
__email__ = '[email protected]'
BURST = 10
class NoSerialException(SerialException): pass
def connectToDrone():
""" Exactly as the name implies -- connect to the drone as hands-off as
possible
"""
# Find serial port (so we don't have to tell it the platform or search
# manually, etc.)
ports = glob('/dev/tty[A-Za-z]*')
if (ports.__len__() == 0):
log.info('No serial ports found, initializing capture')
raise NoSerialException
else:
result = []
for port in ports:
try:
s = Serial(port)
s.close()
result.append(port)
break
except (OSError, SerialException):
pass
if (result.__len__() == 0):
raise NoSerialException
# now connect to the PIXHAWK
vehicle = dronekit.connect(port, baud=57600, wait_ready=True)
log.info('connected to PIXHAWK')
return vehicle
def image(**kwds):
def decorate(f):
'''
Sets count to be an attribute of f,
https://docs.python.org/3/library/functions.html#setattr
'''
for ind in kwds:
setattr(f, ind, kwds[ind])
return f
return decorate
@image(count=0)
def capture(burst=1, form='png', stack=False):
""" Capture image bursts with the RPi camera """
if not stack:
with picamera.PiCamera() as camera:
camera.resolution = (1024, 768) # 0.78 MP
sleep(2)
for _ in xrange(burst):
try:
camera.capture(\
'captures/image{0}.{1}'.format(capture.count, form)
)
except OSError:
os.mkdir('captures')
camera.capture(\
'captures/image{0}.{1}'.format(capture.count, form)
)
capture.count += 1
else:
# Handle stacking after capture in C program (again)
with picamera.PiCamera() as camera:
# Capture video frames and stack
camera.resolution = (1024, 768)
camera.framerate = 30
sleep(2)
camera.capture_sequence([ \
'image{0}.{1}'.format(capture.count + i, form)\
for i in xrange(burst) \
])
capture.count += burst
# iff we're going to use multiprocessing instead of a bash script
#return ['captures/image{0}.{1}'.format(i, form) \
# for i in xrange(capture.count - burst, capture.count)]
@image(take_images=0)
def in_the_air(vehicle):
""" From the data gathered through dronekit, decide if we're in the
air yet """
if (n_the_air.take_images != 1):
# Actually check using dronekit/mavlink
# This could probably be made far more robust, but my initial
# thoughts are that we don't want photos during sport or acro
# mode, but Loiter is excellent for photography - we'll have
# to test this to ensure that these are actually possible
return 1 if (vehicle.mode == 'Loiter' or \
vehicle.mode == 'Alt Hold' or \
vehicle.mode == 'PosHold') \
else 0
else:
return 1
def main(vehicle):
""" Capture images and queue them for conversion to RX format """
while True:
if (in_the_air(vehicle)):
# Capture images first
names = capture(burst=BURST, form='png', stack=st)
log.info('Captured %d images', capture.count)
# Could probably expand this using multiprocessing,
# but it might be easier to just spawn processes within the
# bash script, as I've already written.
else:
sleep(10)
if __name__ == '__main__':
try:
v = connectToDrone()
except NoSerialConnection:
# Just continue and take captures _anyway_ - probably a horridly
# designed script, but we're not worried about speed here, it just
# has to flow and get the idea across: Capture data reliably
in_the_air.take_images = 1
# Following can (and probably _should_) be replaced by argument parser
global st
st = True if (sys.argv[1] == '1') else False
main(v)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment