Created
April 27, 2020 13:42
-
-
Save Williangalvani/838a7f14b2dec1af1d25879f81e82293 to your computer and use it in GitHub Desktop.
Waterlinked DVL Integration script (WIP)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
This script is used to integrate the Waterlinked A50 DVL with ardupilot. | |
It uses a tcp connection to port | |
There must also be a mavlink connection avaiable at 0.0.0.0:14551 so the script can get | |
ATTITUDE messages | |
""" | |
import asyncio | |
from pymavlink import mavutil | |
import math | |
import time | |
import socket | |
import sys | |
import json | |
import argparse | |
if sys.version_info[0] < 3 or sys.version_info[1] < 6: | |
raise Exception("You must use Python >= 3.6 for async support!") | |
def cmd_set_home(lat, lon, alt): | |
'''called when user selects "Set Home (with height)" on map''' | |
print("Setting home to: ", lat, lon, alt) | |
return master.mav.set_gps_global_origin_send( | |
0, | |
lat, # lat | |
lon, # lon | |
alt) # param7 | |
# Create the connection | |
master = mavutil.mavlink_connection('udpin:0.0.0.0:14551', dialect="ardupilotmega") | |
# Wait a heartbeat before sending commands | |
master.wait_heartbeat() | |
master.mav.statustext_send(text=bytearray("#test", encoding="utf-8"), severity=mavutil.mavlink.MAV_SEVERITY_NOTICE) | |
# Set GPS origin ( This is the Blue Robotics parking lot) | |
#cmd_set_home(338414762,-1183351514,1) | |
cmd_set_home(-274905547, -485299403,1) | |
last_attitude = [0.0,0.0,0.0] | |
current_attitude = [0.0,0.0,0.0] | |
last_position = [0.0,0.0,0.0] | |
current_position = [0.0,0.0,0.0] | |
last_time = None | |
last_response = {"vx": 0, "vy": 0} | |
def send_vision(x, y, z, rotation=[0,0,0], confidence=100, dt=125000): | |
"Sends message VISION_POSITION_DELTA to flight controller" | |
print("sending... dt: {0}us, \tconfidence: {1}".format(dt, confidence)) | |
master.mav.vision_position_delta_send( | |
0, # time | |
dt, # delta time (us) | |
rotation, # angle delta | |
[x, y, z], # position delta | |
confidence) # confidence (0-100%) | |
def send_rangefinder(distance): | |
print(int(distance*100)) | |
master.mav.distance_sensor_send( | |
0, # time_boot_ms | |
0, # min_distance | |
5000, # max_distance | |
int(distance*100), # distance | |
2, # type MAVLINK | |
0, # device id | |
25, # PITCH 270 | |
0) | |
async def receive_attitude(): | |
"Receive attitude messages from flight controller" | |
global current_attitude | |
while True: | |
await asyncio.sleep(0.001) | |
msg = master.recv_match() | |
if not msg: | |
continue | |
if msg.get_type() == 'ATTITUDE': | |
msg = msg.to_dict() | |
roll = msg['roll'] | |
pitch = msg['pitch'] | |
yaw = msg['yaw'] | |
current_attitude = [roll, pitch, yaw] | |
async def write_dvl(ip, port): | |
"""every time a new dvl reading is available, calculate new angle deltas | |
and sends to the flight controller | |
""" | |
reader, writer = await asyncio.open_connection(ip, port) | |
global last_attitude | |
global last_time | |
global current_attitude | |
while True: | |
data = await reader.read(1000) | |
if data is None: | |
await asyncio.sleep(0.01) | |
continue | |
try: | |
data = json.loads(data) | |
except: | |
await asyncio.sleep(0.01) | |
continue | |
if data["velocity_valid"] == False: | |
await asyncio.sleep(0.01) | |
continue | |
vx, vy, vz, fom = data["vx"], data["vy"], data["vz"], data["fom"] | |
dt = data["time"] / 1000 | |
dx = dt*vx | |
dy = dt*vy | |
dz = dt*vz | |
# handle bad readings here | |
#if fom > 0.005: | |
# print("bad fom:", fom) | |
# continue | |
send_rangefinder(data["altitude"]) | |
# Subtracts current altitude from last attitude to get the delta | |
angles = list(map(float.__sub__, current_attitude, last_attitude)) | |
angles[2] = angles[2] % (math.pi*2) | |
send_vision(dx, dy, dz, list(angles), dt=int(dt*1e6), confidence=100) | |
last_attitude = current_attitude | |
await asyncio.sleep(0.01) | |
async def main(): | |
"Runs attitude and dvl tasks" | |
parser = argparse.ArgumentParser(description=__doc__) | |
parser.add_argument('-i', '--ip', help='DVL IP address', type=str, default='192.168.2.13') | |
parser.add_argument('-p', '--port', help='TCP port at the DVL', type=int, default=16171) | |
args = parser.parse_args() | |
task1 = asyncio.create_task(receive_attitude()) | |
task2 = asyncio.create_task(write_dvl(args.ip, args.port)) | |
await asyncio.gather(task1, task2) | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment