Created
March 5, 2014 07:02
-
-
Save vo/9362490 to your computer and use it in GitHub Desktop.
Quick hack to position hold with MAVLink using external webcam
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys, os, socket, pickle | |
from time import time | |
from optparse import OptionParser | |
UDP_IP = "127.0.0.1" | |
UDP_PORT = 31200 | |
# tell python where to find mavlink so we can import it | |
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), '../mavlink')) | |
from pymavlink import mavutil | |
class PosHold(object): | |
def __init__(self, mavmaster): | |
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
self.sock.bind((UDP_IP, UDP_PORT)) | |
self.sock.setblocking(0) | |
self.mavmaster = mavmaster | |
def mainloop(self): | |
# wait until the enter key is pressed | |
raw_input("Enter any key to continue") | |
# enter main loop | |
timer = time() | |
thro = 1400 | |
while True: | |
# default to released control | |
control = [0] * 8 | |
try: | |
# grab position data if it's there | |
data, addr = self.sock.recvfrom(1024) | |
diff_x, diff_y = pickle.loads(data) | |
# scaling diff_y | |
# negative number --> throttle up | |
# positive number --> throttle down | |
height = 480.0 | |
max_spd = 10 | |
normalized_y = (diff_y / height * 2.0) | |
spd = max_spd * -normalized_y | |
thro = 1400 + int(spd) # hover at 1400 | |
if (time() - timer) >= 0.02: | |
print "set throttle to", thro | |
control[2] = thro | |
send_rc(self.mavmaster, control) | |
timer = time() | |
except: | |
if (time() - timer) >= 1: | |
# time was allowed to reach 1 | |
# so no data was received | |
print "released rc" | |
release_rc(self.mavmaster) | |
timer = time() | |
# release control back to the radio | |
def release_rc(master): | |
# a value of 0 releases the control to what the radio says | |
values = [ 0 ] * 8 | |
for i in xrange(1): | |
master.mav.rc_channels_override_send(master.target_system, | |
master.target_component, *values) | |
# attempt to send a control. | |
# you can pass 0 to refer to what the radio says | |
# you can pass 0xFFFF to refer to keep the current value | |
def send_rc(master, data): | |
for i in xrange(1): | |
master.mav.rc_channels_override_send( | |
master.target_system, master.target_component, *data) | |
print ("sending rc: %s"%data) | |
def main(): | |
# read command line options | |
parser = OptionParser("readdata.py [options]") | |
parser.add_option("--baudrate", dest="baudrate", type='int', | |
help="master port baud rate", default=115200) | |
parser.add_option("--device", dest="device", default=None, help="serial device") | |
parser.add_option("--rate", dest="rate", default=4, type='int', help="requested stream rate") | |
parser.add_option("--source-system", dest='SOURCE_SYSTEM', type='int', | |
default=255, help='MAVLink source system for this GCS') | |
parser.add_option("--showmessages", dest="showmessages", action='store_true', | |
help="show incoming messages", default=False) | |
(opts, args) = parser.parse_args() | |
if opts.device is None: | |
print("You must specify a serial device") | |
sys.exit(1) | |
# create a mavlink serial instance | |
master = mavutil.mavlink_connection(opts.device, baud=opts.baudrate) | |
# wait for the heartbeat msg to find the system ID | |
master.wait_heartbeat() | |
# request data to be sent at the given rate | |
master.mav.request_data_stream_send(master.target_system, master.target_component, | |
mavutil.mavlink.MAV_DATA_STREAM_ALL, opts.rate, 1) | |
# run the main loop | |
app = PosHold(master) | |
app.mainloop() | |
if __name__ == '__main__': | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import cv | |
import socket, pickle | |
UDP_IP = "127.0.0.1" | |
UDP_PORT = 31200 | |
class Target: | |
def __init__(self): | |
self.capture = cv.CaptureFromCAM(1) | |
cv.NamedWindow("Target", 1) | |
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
def run(self): | |
# Capture first frame to get size | |
frame = cv.QueryFrame(self.capture) | |
frame_size = cv.GetSize(frame) | |
color_image = cv.CreateImage(cv.GetSize(frame), 8, 3) | |
grey_image = cv.CreateImage(cv.GetSize(frame), cv.IPL_DEPTH_8U, 1) | |
moving_average = cv.CreateImage(cv.GetSize(frame), cv.IPL_DEPTH_32F, 3) | |
first = True | |
while True: | |
closest_to_left = cv.GetSize(frame)[0] | |
closest_to_right = cv.GetSize(frame)[1] | |
color_image = cv.QueryFrame(self.capture) | |
# Smooth to get rid of false positives | |
cv.Smooth(color_image, color_image, cv.CV_GAUSSIAN, 3, 0) | |
if first: | |
difference = cv.CloneImage(color_image) | |
temp = cv.CloneImage(color_image) | |
cv.ConvertScale(color_image, moving_average, 1.0, 0.0) | |
first = False | |
# else: | |
# cv.RunningAvg(color_image, moving_average, 0.020, None) | |
# Convert the scale of the moving average. | |
cv.ConvertScale(moving_average, temp, 1.0, 0.0) | |
# Minus the current frame from the moving average. | |
cv.AbsDiff(color_image, temp, difference) | |
# Convert the image to grayscale. | |
cv.CvtColor(difference, grey_image, cv.CV_RGB2GRAY) | |
# Convert the image to black and white. | |
cv.Threshold(grey_image, grey_image, 70, 255, cv.CV_THRESH_BINARY) | |
# Dilate and erode to get people blobs | |
cv.Dilate(grey_image, grey_image, None, 18) | |
cv.Erode(grey_image, grey_image, None, 10) | |
storage = cv.CreateMemStorage(0) | |
contour = cv.FindContours(grey_image, storage, cv.CV_RETR_CCOMP, cv.CV_CHAIN_APPROX_SIMPLE) | |
points = [] | |
while contour: | |
bound_rect = cv.BoundingRect(list(contour)) | |
contour = contour.h_next() | |
pt1 = (bound_rect[0], bound_rect[1]) | |
pt2 = (bound_rect[0] + bound_rect[2], bound_rect[1] + bound_rect[3]) | |
points.append(pt1) | |
points.append(pt2) | |
cv.Rectangle(color_image, pt1, pt2, cv.CV_RGB(255,0,0), 1) | |
if len(points): | |
center_point = reduce(lambda a, b: ((a[0] + b[0]) / 2, (a[1] + b[1]) / 2), points) | |
img_center = (frame_size[0]/2, frame_size[1]/2) | |
diff = [center_point[0] - img_center[0], img_center[1] - center_point[1]] | |
datastr = pickle.dumps(diff) | |
self.sock.sendto(datastr, (UDP_IP, UDP_PORT)) | |
cv.Circle(color_image, center_point, 40, cv.CV_RGB(255, 255, 255), 1) | |
cv.Circle(color_image, center_point, 30, cv.CV_RGB(255, 100, 0), 1) | |
cv.Circle(color_image, center_point, 20, cv.CV_RGB(255, 255, 255), 1) | |
cv.Circle(color_image, center_point, 10, cv.CV_RGB(255, 100, 0), 1) | |
cv.ShowImage("Target", color_image) | |
# Listen for ESC key | |
c = cv.WaitKey(7) % 0x100 | |
if c == 27: | |
break | |
if __name__=="__main__": | |
t = Target() | |
t.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment