Last active
May 22, 2020 09:47
-
-
Save vo/9c1a790cfd413512b153f928b24bb081 to your computer and use it in GitHub Desktop.
mavlink packet inspector
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pymavlink import mavutil | |
import threading | |
connection = mavutil.mavlink_connection("udp:0.0.0.0:14592") | |
local = mavutil.mavlink_connection("udpout:127.0.0.1:14593") | |
def on_msg_from_uas(msg): | |
buf = msg.get_msgbuf() | |
local.write(buf) | |
def on_msg_from_gcs(msg): | |
buf = msg.get_msgbuf() | |
connection.write(buf) | |
def uas_loop(): | |
while True: | |
try: | |
msg = connection.recv_msg() | |
if msg: | |
msg_type = msg.get_type() | |
if msg_type.startswith("MISSION_REQUEST"): | |
print "uas: MISSION_REQUEST seq:", msg.seq | |
elif msg_type.startswith("MISSION_ITEM"): | |
print "uas: MISSION_ITEM", msg.seq | |
elif msg_type.startswith("MISSION") and msg_type != "MISSION_CURRENT": | |
print "uas:", msg_type | |
if msg_type.startswith("STATUSTEXT"): | |
print "uas:", msg.text | |
on_msg_from_uas(msg) | |
except Exception as e: | |
pass | |
def gcs_loop(): | |
while True: | |
try: | |
msg = local.recv_msg() | |
if msg: | |
msg_type = msg.get_type() | |
if msg_type.startswith("MISSION"): | |
if msg_type == "MISSION_ITEM": | |
print "gcs:", msg_type, "seq:", msg.seq, "cmd:", msg.command | |
elif msg_type == "MISSION_COUNT": | |
print "gcs:", msg_type, "count:", msg.count | |
else: | |
print "gcs:", msg_type | |
if msg_type.startswith("STATUSTEXT"): | |
print "gcs:", msg.text | |
on_msg_from_gcs(msg) | |
except Exception as e: | |
pass | |
uas_thread = threading.Thread(target=uas_loop) | |
gcs_thread = threading.Thread(target=gcs_loop) | |
uas_thread.start() | |
gcs_thread.start() | |
uas_thread.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment