Created
May 18, 2018 22:10
-
-
Save Andrew-William-Smith/741d7b609be54455a283cdfbe17a98a5 to your computer and use it in GitHub Desktop.
Autonomous nanoquad navigation of arbitrary paths
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Andrew S 18 May 2018 | |
"Autonomous nanoquad navigation of arbitrary paths" | |
Information Engineering, St. Mark's School of Texas | |
Abstract: This program uses machine vision to allow a nanoquad to trace an arbitrary path, | |
defined in a certain colour.""" | |
import cv2 | |
import logging | |
import math | |
import numpy | |
import skimage.morphology | |
import time | |
import cflib.crtp | |
from cflib.crazyflie import Crazyflie | |
from cflib.crazyflie.log import LogConfig | |
from cflib.crazyflie.syncCrazyflie import SyncCrazyflie | |
from cflib.crazyflie.syncLogger import SyncLogger | |
# Colours to be detected (essentally random to start) | |
lowerStartBound = numpy.array([120, 60, 30], dtype=numpy.uint8) | |
upperStartBound = numpy.array([255, 170, 100], dtype=numpy.uint8) | |
lowerPathBound = numpy.array([120, 60, 30], dtype=numpy.uint8) | |
upperPathBound = numpy.array([255, 170, 100], dtype=numpy.uint8) | |
lowerEndBound = numpy.array([120, 60, 30], dtype=numpy.uint8) | |
upperEndBound = numpy.array([255, 170, 100], dtype=numpy.uint8) | |
envelope = 30 | |
gFrame = None | |
logging.basicConfig(level=logging.ERROR) | |
# Drone control parameters | |
dropThreshold = 0.05 # Distance from the ground at which to halt motors | |
navThreshold = 20 # Pixel threshold at which to consider navigation points "reached" | |
scf = None | |
pathPoint = 0 | |
# Variables for rectifying software height with flow sensor height | |
zCurrent = 0 | |
zCmdStart = 0 | |
zHardwareQueue = [] | |
zHardware = 0 | |
battery = 0 | |
def parseDistance(distance, token, moveTime): | |
"""Convert a distance sigil into a control velocity. Currently unused.""" | |
if distance.lower().endswith(token): | |
return float(distance[:-len(token)]) / moveTime | |
else: | |
return float(distance) | |
def logReceived(timestamp, data, logconf): | |
global battery, zHardware, zHardwareQueue | |
# Set battery percentage | |
battery = 100 * (data['pm.vbatMV'] - 3800) / 380 | |
# Set hardware z-coordinate from average of last 3 values | |
zHardwareQueue.append(data['stateEstimate.z']) | |
if len(zHardwareQueue) > 2: | |
zHardware = sum(zHardwareQueue) / 3 | |
zHardwareQueue = [] | |
return | |
def runMoveCommand(cmd): | |
"""Run a synchronous drone control command. Syntax: 'vy vx yaw altitude time'""" | |
global zCurrent, hover | |
cmd = cmd.split() | |
zCmdStart = zCurrent | |
try: | |
moveTime = float(cmd[4]) | |
# Handle distance vs. speed mapping | |
vx = parseDistance(cmd[0], 'm', moveTime) | |
vy = parseDistance(cmd[1], 'm', moveTime) | |
yaw = parseDistance(cmd[2], 'd', moveTime) | |
z = float(cmd[3]) | |
# Perform altitude changes gradually to allow for sensor adjustment | |
# if zCurrent != 0: | |
# zCurrent = zHardware | |
iterations = moveTime / 0.05 | |
for i in range(int(iterations)): | |
zCurrent += (z - zCmdStart) / iterations | |
# Halt rotors on descent | |
if zCurrent < dropThreshold and zCurrent < zCmdStart: | |
scf.commander.send_stop_setpoint() | |
zCurrent = 0 | |
break | |
# Maximum height of 1.5 meters (maximum supported by laser rangefinder, confirmed by hardware) | |
elif zCurrent >= 1.5 and zHardware >= 1.5 and zCurrent > zCmdStart: | |
print('Exceeded maximum height: returning to 1.5 metres.') | |
zCurrent = 1.5 | |
scf.commander.send_hover_setpoint(vx, vy, yaw, 1.5) | |
else: | |
scf.commander.send_hover_setpoint(vx, vy, yaw, zCurrent) | |
time.sleep(0.05) | |
except Exception as e: | |
print('Syntax error') | |
def getLargestContour(lower, upper): | |
"""Find the largest contour within a colour range.""" | |
pathMask = cv2.inRange(rawFrame, lower, upper) | |
pathMaskClose = cv2.morphologyEx(pathMask, cv2.MORPH_CLOSE, kernelClose) | |
_, contours, _ = cv2.findContours(pathMaskClose, cv2.RETR_EXTERNAL, | |
cv2.CHAIN_APPROX_SIMPLE) | |
maxArea = 0 | |
largestContour = None | |
for contour in contours: | |
area = cv2.contourArea(contour) | |
if area > maxArea: | |
maxArea = area | |
largestContour = contour | |
return largestContour | |
def uintAddAbsolute(val, addend): | |
"""Add a value to a numpy.uint8, without wrapping on over/underflow.""" | |
return (val.astype(numpy.int16) + addend).clip(0, 255).astype(numpy.uint8) | |
def getPointDistance(p1, p2): | |
"""Calculate the distance between two points, represented as arrays of the form (x, y)""" | |
return math.sqrt((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2) | |
def recalibrateColour(evt, x, y, flags, param): | |
"""Event to set path and start/end semaphore pom colours on click. | |
Left click: set path colour | |
Middle click: set start/quadcopter semaphore colour | |
Right click: set end semaphore pom colour""" | |
global lowerStartBound, upperStartBound, \ | |
lowerPathBound, upperPathBound, \ | |
lowerEndBound, upperEndBound | |
if evt == cv2.EVENT_LBUTTONUP: | |
envelope = cv2.getTrackbarPos('Path e(L)', 'camera') | |
hVal, sVal, vVal = gFrame[y, x] | |
lowerPathBound = numpy.array([uintAddAbsolute(hVal, -envelope), | |
uintAddAbsolute(sVal, -envelope), | |
uintAddAbsolute(vVal, -envelope)], | |
dtype=numpy.uint8) | |
upperPathBound = numpy.array([uintAddAbsolute(hVal, envelope), | |
uintAddAbsolute(sVal, envelope), | |
uintAddAbsolute(vVal, envelope)], | |
dtype=numpy.uint8) | |
print(lowerPathBound, upperPathBound) | |
elif evt == cv2.EVENT_MBUTTONUP: | |
envelope = cv2.getTrackbarPos('Start e(M)', 'camera') | |
hVal, sVal, vVal = gFrame[y, x] | |
lowerStartBound = numpy.array([uintAddAbsolute(hVal, -envelope), | |
uintAddAbsolute(sVal, -envelope), | |
uintAddAbsolute(vVal, -envelope)], | |
dtype=numpy.uint8) | |
upperStartBound = numpy.array([uintAddAbsolute(hVal, envelope), | |
uintAddAbsolute(sVal, envelope), | |
uintAddAbsolute(vVal, envelope)], | |
dtype=numpy.uint8) | |
print(lowerStartBound, upperStartBound) | |
elif evt == cv2.EVENT_RBUTTONUP: | |
envelope = cv2.getTrackbarPos('End e(R)', 'camera') | |
hVal, sVal, vVal = gFrame[y, x] | |
lowerEndBound = numpy.array([uintAddAbsolute(hVal, -envelope), | |
uintAddAbsolute(sVal, -envelope), | |
uintAddAbsolute(vVal, -envelope)], | |
dtype=numpy.uint8) | |
upperEndBound = numpy.array([uintAddAbsolute(hVal, envelope), | |
uintAddAbsolute(sVal, envelope), | |
uintAddAbsolute(vVal, envelope)], | |
dtype=numpy.uint8) | |
print(lowerEndBound, upperEndBound) | |
return | |
if __name__ == '__main__': | |
# Initialise drivers | |
cflib.crtp.init_drivers(enable_debug_driver=False) | |
# Scan for quadcopters on channel 0xE7..E4 | |
print('Scanning for devices...') | |
devices = cflib.crtp.scan_interfaces(0xE7E7E7E7E4) | |
if len(devices) == 0: | |
print('No devices found on channel. Terminating program.') | |
else: | |
# Get device on highest-throughput channel | |
device = devices[len(devices) - 1][0] | |
print('Using device: ' + device) | |
# Initialise logger with a period of 50 ms per sample | |
logReport = LogConfig(name='Reporter', period_in_ms=50) | |
logReport.add_variable('stateEstimate.z', 'float') | |
logReport.add_variable('pm.vbatMV', 'float') | |
logReport.data_received_cb.add_callback(logReceived) | |
# Connect to Crazyflie | |
cf = Crazyflie(rw_cache='./cfcache') | |
with SyncCrazyflie(device, cf=cf) as syncCf: | |
with SyncLogger(syncCf, logReport) as logger: | |
scf = syncCf.cf | |
# Calibrate flow sensor | |
print('Calibrating flow sensor...') | |
scf.param.set_value('kalman.resetEstimation', '1') | |
time.sleep(0.1) | |
scf.param.set_value('kalman.resetEstimation', '0') | |
time.sleep(2) | |
print('Initialised Crazyflie.') | |
# Establish camera connexion and processing kernels | |
camera = cv2.VideoCapture(0) | |
kernelOpen = numpy.ones((5, 5)) | |
kernelClose = numpy.ones((20, 20)) | |
# Configure UI | |
cv2.namedWindow('camera') | |
cv2.setMouseCallback('camera', recalibrateColour) | |
cv2.createTrackbar('Start e(M)', 'camera', 30, 100, lambda x: None) | |
cv2.createTrackbar('Path e(L)', 'camera', 30, 100, lambda x: None) | |
cv2.createTrackbar('End e(R)', 'camera', 30, 100, lambda x: None) | |
cv2.createTrackbar('e permille', 'camera', 20, 100, lambda x: None) | |
setPath = False | |
path = None | |
while True: | |
# Capture frame and convert colour space | |
frameRead, rawFrame = camera.read() | |
rawFrame = cv2.resize(rawFrame, (680, 440)) | |
rawFrame = cv2.cvtColor(rawFrame, cv2.COLOR_BGR2HSV) | |
frame = rawFrame.copy() | |
gFrame = rawFrame.copy() | |
if not setPath: | |
path = [] | |
# Path isolation ---------------------------------------------------------------------- | |
largestContour = getLargestContour(lowerPathBound, upperPathBound) | |
pathContour = None | |
# Ensure that path contour exists and contains points | |
if largestContour is not None and len(largestContour) > 0: | |
cv2.drawContours(frame, [largestContour], -1, (255, 0, 0), 3) | |
# Draw sparse polygon (structure: begins at top left) | |
epsilon = (float(cv2.getTrackbarPos('e permille', 'camera')) * | |
cv2.arcLength(largestContour, True) / 1000) | |
pathOutline = cv2.approxPolyDP(largestContour, epsilon, True) | |
cv2.drawContours(frame, [pathOutline], -1, (0, 255, 255), 2) | |
# Find middle path through path contour | |
# Define path mask; dimensions are defined as (height, width), in contrast with all | |
# other coordinates in this program | |
pathMask = numpy.zeros((440, 680), numpy.uint8) | |
cv2.drawContours(pathMask, [pathOutline], -1, (1), cv2.FILLED) | |
pathMask = skimage.morphology.skeletonize_3d(pathMask).astype(numpy.uint8) * 255 | |
_, pathContour, _ = cv2.findContours(pathMask, cv2.RETR_EXTERNAL, | |
cv2.CHAIN_APPROX_SIMPLE) | |
# Start point isolation and start coordinates ----------------------------------------- | |
startCentroidX = 0 | |
startCentroidY = 0 | |
largestContour = getLargestContour(lowerStartBound, upperStartBound) | |
if largestContour is not None and len(largestContour) > 0: | |
# Draw start centroid in orange | |
moments = cv2.moments(largestContour) | |
startCentroidX = int(moments['m10'] / moments['m00']) | |
startCentroidY = int(moments['m01'] / moments['m00']) | |
cv2.rectangle(frame, (startCentroidX - 2, startCentroidY - 2), | |
(startCentroidX + 2, startCentroidY + 2), (200, 255, 255), 2) | |
# End point isolation and end coordinates --------------------------------------------- | |
endCentroidX = 0 | |
endCentroidY = 0 | |
largestContour = getLargestContour(lowerEndBound, upperEndBound) | |
if largestContour is not None and len(largestContour) > 0: | |
# Draw end centroid in green | |
moments = cv2.moments(largestContour) | |
endCentroidX = int(moments['m10'] / moments['m00']) | |
endCentroidY = int(moments['m01'] / moments['m00']) | |
cv2.rectangle(frame, (endCentroidX - 2, endCentroidY - 2), | |
(endCentroidX + 2, endCentroidY + 2), (250, 255, 255), 2) | |
# Path start determination ------------------------------------------------------------ | |
if pathContour is not None and len(pathContour) > 0 and len(pathContour[0]) > 0: | |
cv2.drawContours(frame, pathContour, -1, (100, 255, 255), 3) | |
# Find closest and next-closest points to start centroid | |
closestPoint = None | |
closestDistance = 10000 | |
nextPoint = None | |
endPoint = None | |
closestEnd = 10000 | |
pathContour = pathContour[0] | |
for i in range(len(pathContour)): | |
distance = getPointDistance((startCentroidX, startCentroidY), pathContour[i][0]) | |
if distance < closestDistance: | |
closestPoint = i | |
closestDistance = distance | |
nextPoint = closestPoint | |
distance = getPointDistance((endCentroidX, endCentroidY), pathContour[i][0]) | |
if distance < closestEnd: | |
endPoint = i | |
closestEnd = distance | |
# Determine direction in which to iterate through contour | |
# Edge cases: | |
# - closest point at end of contour array, next point at index 0: decrement | |
# - closest point at index 0 in contour array, next point at end: increment | |
decrement = False | |
if (closestPoint == len(pathContour) - 1) and (nextPoint == 0): | |
decrement = True | |
elif (closestPoint == 0) and (nextPoint == len(pathContour) - 1): | |
decrement = False | |
elif closestPoint > nextPoint: | |
decrement = True | |
else: | |
decrement = False | |
# Create path array, "walking" in the determined direction and wrapping around when ends of array are reached | |
currentPoint = closestPoint | |
while True: | |
path.append(pathContour[currentPoint][0]) | |
if decrement: | |
currentPoint -= 1 | |
if currentPoint < 0: | |
currentPoint = len(pathContour) - 1 | |
else: | |
currentPoint += 1 | |
if currentPoint > len(pathContour) - 1: | |
currentPoint = 0 | |
# Terminate path walking once nearest point to end pom is reached | |
if currentPoint == endPoint: | |
break | |
else: | |
# Quadcopter coordinates ------------------------------------------------------------- | |
quadCentroidX = 0 | |
quadCentroidY = 0 | |
largestContour = getLargestContour(lowerStartBound, upperStartBound) | |
if largestContour is not None and len(largestContour) > 0: | |
# Draw quadcopter centroid | |
moments = cv2.moments(largestContour) | |
quadCentroidX = int(moments['m10'] / moments['m00']) | |
quadCentroidY = int(moments['m01'] / moments['m00']) | |
cv2.rectangle(frame, (quadCentroidX - 2, quadCentroidY - 2), | |
(quadCentroidX + 2, quadCentroidY + 2), (200, 255, 255), 2) | |
# Navigate --------------------------------------------------------------------------- | |
dx = path[pathPoint][0] - quadCentroidX | |
dy = path[pathPoint][1] - quadCentroidY | |
# Consider path point "reached" once distance is within threshold | |
if abs(dx) < navThreshold and abs(dy) < navThreshold: | |
pathPoint += 1 | |
print('Navigating to point {}'.format(pathPoint)) | |
# Land once last point reached | |
if pathPoint == len(path): | |
print('Endpoint reached. Landing.') | |
runMoveCommand('0 0 0 0 1') | |
break | |
continue | |
# Velocity scale constant = -0.005 | |
vx = dx * -0.005 | |
vy = dy * -0.005 | |
runMoveCommand('{} {} 0 .3 .1'.format(vy, vx)) | |
frame = cv2.cvtColor(frame, cv2.COLOR_HSV2BGR) | |
# Display camera image and contour | |
if len(path) > 0: | |
if not setPath: | |
# Animate path | |
for point in path: | |
cv2.rectangle(frame, (point[0] - 2, point[1] - 2), | |
(point[0] + 2, point[1] + 2), | |
(0, 255, 0), 2) | |
# Note: this completely breaks multithreading. Thus, navigation is performed synchronously. | |
cv2.imshow('camera', frame) | |
# Press <a> to confirm set path and begin navigation | |
if cv2.waitKey(10) == ord('a'): | |
setPath = True | |
# Take off | |
runMoveCommand('0 0 0 .3 .5') | |
print('Takeoff complete. Beginning navigation sequence.') | |
else: | |
cv2.imshow('camera', frame) | |
cv2.waitKey(10) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment