Skip to content

Instantly share code, notes, and snippets.

@edxmorgan
Created December 11, 2024 19:57
Show Gist options
  • Save edxmorgan/8ac96eccc555b1975f386e0c355f4b05 to your computer and use it in GitHub Desktop.
Save edxmorgan/8ac96eccc555b1975f386e0c355f4b05 to your computer and use it in GitHub Desktop.
import asyncio
import json
from urllib.parse import urlparse, parse_qs
import aiohttp
import websockets
import datetime
import csv
import os
import aiofiles
async def fetch_initial_data(http_session, http_url):
"""Fetch initial JSON data from the given HTTP URL."""
async with http_session.get(http_url) as resp:
resp.raise_for_status()
data = await resp.json()
# The JS code deletes data.status and then uses data.message.
# We assume data is of the form { "status": ..., "message": {...} }.
# Adjust as necessary depending on the actual response format.
data.pop("status", None)
return data.get("message", data)
def parse_query_string(query_string):
"""Parse the query string to extract path components."""
parsed = parse_qs(urlparse(query_string).query)
path = parsed.get("path", [""])[0] # e.g., 'mavlink/vehicles/1/components/1/messages/ATTITUDE' or 'v1/mavlink/vehicles/1/components/1/messages/RC_CHANNELS'
parts = path.split('/')
# If the path starts with 'v1', remove it
if parts and parts[0] == 'v1':
parts.pop(0)
# Remove 'mavlink'
if parts and parts[0] == 'mavlink':
parts.pop(0)
# Remove 'vehicles'
if parts and parts[0] == 'vehicles':
parts.pop(0)
vehicleId = None
if parts:
vehicleId_str = parts.pop(0) # e.g., '1'
vehicleId = int(vehicleId_str) if vehicleId_str.isdigit() else None
# Remove 'components'
if parts and parts[0] == 'components':
parts.pop(0)
componentId = None
if parts:
componentId_str = parts.pop(0) # e.g., '1'
componentId = int(componentId_str) if componentId_str.isdigit() else None
# Remove 'messages'
if parts and parts[0] == 'messages':
parts.pop(0)
messageName = None
if parts:
messageName = parts.pop(0) # e.g., 'ATTITUDE' or 'RC_CHANNELS'
return {
"vehicleId": vehicleId,
"componentId": componentId,
"messageName": messageName,
"path": path
}
async def write_to_csv(csv_writer, message_name, message_data):
"""Write a single row to the CSV file with a timestamp."""
timestamp = datetime.datetime.utcnow().isoformat() + 'Z' # ISO 8601 format in UTC
row = {
"timestamp": timestamp,
"message_type": message_name,
"message_data": json.dumps(message_data)
}
await csv_writer.writerow(row)
async def initialize_csv(csv_file_path, fieldnames):
"""Initialize the CSV file, writing headers if the file does not exist."""
file_exists = os.path.isfile(csv_file_path)
csv_file = await aiofiles.open(csv_file_path, mode='a', newline='', encoding='utf-8')
csv_writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
if not file_exists:
await csv_file.write(','.join(fieldnames) + '\n') # Write header
return csv_file, csv_writer
async def main():
# Define the query strings for all messages
query_strings = [
"?path=mavlink/vehicles/1/components/1/messages/ATTITUDE",
"?path=mavlink/vehicles/1/components/1/messages/LOCAL_POSITION_NED",
"?path=v1/mavlink/vehicles/1/components/1/messages/RC_CHANNELS"
]
# Parse all query strings
paths_info = [parse_query_string(qs) for qs in query_strings]
# Define the host and endpoints
WSAPI = "ws://192.168.2.2:6040" # Adjust this as needed
websocket_url = f"{WSAPI}/v1/ws/mavlink"
# Prepare HTTP URLs for initial data fetch
http_urls = [f"http://192.168.2.2:6040/{info['path']}" for info in paths_info]
# Initialize CSV file
csv_file_path = "mavlink_messages_free_style.csv"
fieldnames = ["timestamp", "message_type", "message_data"]
csv_file, csv_writer = await initialize_csv(csv_file_path, fieldnames)
try:
async with aiohttp.ClientSession() as http_session:
# Create tasks to fetch all initial data concurrently
fetch_tasks = [fetch_initial_data(http_session, url) for url in http_urls]
initial_messages = await asyncio.gather(*fetch_tasks, return_exceptions=True)
for idx, message in enumerate(initial_messages):
if isinstance(message, Exception):
print(f"Error fetching initial data for path {paths_info[idx]['path']}: {message}")
else:
print(f"Initial message from HTTP fetch for {paths_info[idx]['messageName']}:")
print(json.dumps(message, indent=2))
print("-" * 50)
# Write initial message to CSV
await write_to_csv(csv_writer, paths_info[idx]['messageName'], message)
finally:
await csv_file.flush()
# Connect to the WebSocket
async with websockets.connect(websocket_url) as websocket:
print("[open] WebSocket connection established.")
while True:
try:
event_data = await websocket.recv()
except websockets.ConnectionClosed:
print("[close] WebSocket connection closed.")
break
try:
jsonData = json.loads(event_data)
except json.JSONDecodeError:
print("[error] Received non-JSON data from WebSocket.")
continue
hdr = jsonData.get("header", {})
msg = jsonData.get("message", {})
# Iterate through all path infos to check if the message matches any criteria
for info in paths_info:
vehicleId = info["vehicleId"]
componentId = info["componentId"]
messageName = info["messageName"]
if vehicleId is not None and hdr.get("system_id") != vehicleId:
continue
if componentId is not None and hdr.get("component_id") != componentId:
continue
if messageName is not None and msg.get("type") != messageName:
continue
# If we get here, it's a message we care about.
filtered_message = msg if msg else jsonData
# print(f"Filtered message from WebSocket for {messageName}:")
# print(json.dumps(filtered_message, indent=2))
# print("-" * 50)
# Write the filtered message to CSV
await write_to_csv(csv_writer, messageName, filtered_message)
# Close the CSV file when done
await csv_file.close()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n[exit] Script terminated by user.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment