Last active
August 19, 2016 05:52
-
-
Save emmeowzing/82407e6f8e8a97e27b2feb3b054ce972 to your computer and use it in GitHub Desktop.
Capure RPi camera images. This script is untested since I don't have a drone and RPi camera to test it on.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #! /usr/bin/env python2 | |
| # -*- coding: utf-8 -*- | |
| """ Capure RPi camera images. This script is untested since I don't have a | |
| drone and RPiCam to test it on. | |
| """ | |
| from glob import glob | |
| from serial import Serial, SerialException | |
| from time import sleep | |
| import io | |
| import sys | |
| import logging | |
| logging.basicConfig(filename='/var/log/capture.log', level=logging.INFO) | |
| log = logging.getLogger(__name__) | |
| # For capturing image data - defaults mplayer | ffmpeg if not found | |
| try: | |
| import picamera | |
| except ImportError as I: | |
| log.info("%s, fatal", I) | |
| sys.exit(1) | |
| # For knowing we're in the air and should start collecting data | |
| import dronekit | |
| __author__ = 'Brandon Doyle' | |
| __email__ = '[email protected]' | |
| BURST = 10 | |
| class NoSerialException(SerialException): pass | |
| def connectToDrone(): | |
| """ Exactly as the name implies -- connect to the drone as hands-off as | |
| possible | |
| """ | |
| # Find serial port (so we don't have to tell it the platform or search | |
| # manually, etc.) | |
| ports = glob('/dev/tty[A-Za-z]*') | |
| if (ports.__len__() == 0): | |
| log.info('No serial ports found, initializing capture') | |
| raise NoSerialException | |
| else: | |
| result = [] | |
| for port in ports: | |
| try: | |
| s = Serial(port) | |
| s.close() | |
| result.append(port) | |
| break | |
| except (OSError, SerialException): | |
| pass | |
| if (result.__len__() == 0): | |
| raise NoSerialException | |
| # now connect to the PIXHAWK | |
| vehicle = dronekit.connect(port, baud=57600, wait_ready=True) | |
| log.info('connected to PIXHAWK') | |
| return vehicle | |
| def image(**kwds): | |
| def decorate(f): | |
| ''' | |
| Sets count to be an attribute of f, | |
| https://docs.python.org/3/library/functions.html#setattr | |
| ''' | |
| for ind in kwds: | |
| setattr(f, ind, kwds[ind]) | |
| return f | |
| return decorate | |
| @image(count=0) | |
| def capture(burst=1, form='png', stack=False): | |
| """ Capture image bursts with the RPi camera """ | |
| if not stack: | |
| with picamera.PiCamera() as camera: | |
| camera.resolution = (1024, 768) # 0.78 MP | |
| sleep(2) | |
| for _ in xrange(burst): | |
| try: | |
| camera.capture(\ | |
| 'captures/image{0}.{1}'.format(capture.count, form) | |
| ) | |
| except OSError: | |
| os.mkdir('captures') | |
| camera.capture(\ | |
| 'captures/image{0}.{1}'.format(capture.count, form) | |
| ) | |
| capture.count += 1 | |
| else: | |
| # Handle stacking after capture in C program (again) | |
| with picamera.PiCamera() as camera: | |
| # Capture video frames and stack | |
| camera.resolution = (1024, 768) | |
| camera.framerate = 30 | |
| sleep(2) | |
| camera.capture_sequence([ \ | |
| 'image{0}.{1}'.format(capture.count + i, form)\ | |
| for i in xrange(burst) \ | |
| ]) | |
| capture.count += burst | |
| # iff we're going to use multiprocessing instead of a bash script | |
| #return ['captures/image{0}.{1}'.format(i, form) \ | |
| # for i in xrange(capture.count - burst, capture.count)] | |
| @image(take_images=0) | |
| def in_the_air(vehicle): | |
| """ From the data gathered through dronekit, decide if we're in the | |
| air yet """ | |
| if (n_the_air.take_images != 1): | |
| # Actually check using dronekit/mavlink | |
| # This could probably be made far more robust, but my initial | |
| # thoughts are that we don't want photos during sport or acro | |
| # mode, but Loiter is excellent for photography - we'll have | |
| # to test this to ensure that these are actually possible | |
| return 1 if (vehicle.mode == 'Loiter' or \ | |
| vehicle.mode == 'Alt Hold' or \ | |
| vehicle.mode == 'PosHold') \ | |
| else 0 | |
| else: | |
| return 1 | |
| def main(vehicle): | |
| """ Capture images and queue them for conversion to RX format """ | |
| while True: | |
| if (in_the_air(vehicle)): | |
| # Capture images first | |
| names = capture(burst=BURST, form='png', stack=st) | |
| log.info('Captured %d images', capture.count) | |
| # Could probably expand this using multiprocessing, | |
| # but it might be easier to just spawn processes within the | |
| # bash script, as I've already written. | |
| else: | |
| sleep(10) | |
| if __name__ == '__main__': | |
| try: | |
| v = connectToDrone() | |
| except NoSerialConnection: | |
| # Just continue and take captures _anyway_ - probably a horridly | |
| # designed script, but we're not worried about speed here, it just | |
| # has to flow and get the idea across: Capture data reliably | |
| in_the_air.take_images = 1 | |
| # Following can (and probably _should_) be replaced by argument parser | |
| global st | |
| st = True if (sys.argv[1] == '1') else False | |
| main(v) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment