Created
December 11, 2024 19:57
-
-
Save edxmorgan/8ac96eccc555b1975f386e0c355f4b05 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import json | |
from urllib.parse import urlparse, parse_qs | |
import aiohttp | |
import websockets | |
import datetime | |
import csv | |
import os | |
import aiofiles | |
async def fetch_initial_data(http_session, http_url): | |
"""Fetch initial JSON data from the given HTTP URL.""" | |
async with http_session.get(http_url) as resp: | |
resp.raise_for_status() | |
data = await resp.json() | |
# The JS code deletes data.status and then uses data.message. | |
# We assume data is of the form { "status": ..., "message": {...} }. | |
# Adjust as necessary depending on the actual response format. | |
data.pop("status", None) | |
return data.get("message", data) | |
def parse_query_string(query_string): | |
"""Parse the query string to extract path components.""" | |
parsed = parse_qs(urlparse(query_string).query) | |
path = parsed.get("path", [""])[0] # e.g., 'mavlink/vehicles/1/components/1/messages/ATTITUDE' or 'v1/mavlink/vehicles/1/components/1/messages/RC_CHANNELS' | |
parts = path.split('/') | |
# If the path starts with 'v1', remove it | |
if parts and parts[0] == 'v1': | |
parts.pop(0) | |
# Remove 'mavlink' | |
if parts and parts[0] == 'mavlink': | |
parts.pop(0) | |
# Remove 'vehicles' | |
if parts and parts[0] == 'vehicles': | |
parts.pop(0) | |
vehicleId = None | |
if parts: | |
vehicleId_str = parts.pop(0) # e.g., '1' | |
vehicleId = int(vehicleId_str) if vehicleId_str.isdigit() else None | |
# Remove 'components' | |
if parts and parts[0] == 'components': | |
parts.pop(0) | |
componentId = None | |
if parts: | |
componentId_str = parts.pop(0) # e.g., '1' | |
componentId = int(componentId_str) if componentId_str.isdigit() else None | |
# Remove 'messages' | |
if parts and parts[0] == 'messages': | |
parts.pop(0) | |
messageName = None | |
if parts: | |
messageName = parts.pop(0) # e.g., 'ATTITUDE' or 'RC_CHANNELS' | |
return { | |
"vehicleId": vehicleId, | |
"componentId": componentId, | |
"messageName": messageName, | |
"path": path | |
} | |
async def write_to_csv(csv_writer, message_name, message_data): | |
"""Write a single row to the CSV file with a timestamp.""" | |
timestamp = datetime.datetime.utcnow().isoformat() + 'Z' # ISO 8601 format in UTC | |
row = { | |
"timestamp": timestamp, | |
"message_type": message_name, | |
"message_data": json.dumps(message_data) | |
} | |
await csv_writer.writerow(row) | |
async def initialize_csv(csv_file_path, fieldnames): | |
"""Initialize the CSV file, writing headers if the file does not exist.""" | |
file_exists = os.path.isfile(csv_file_path) | |
csv_file = await aiofiles.open(csv_file_path, mode='a', newline='', encoding='utf-8') | |
csv_writer = csv.DictWriter(csv_file, fieldnames=fieldnames) | |
if not file_exists: | |
await csv_file.write(','.join(fieldnames) + '\n') # Write header | |
return csv_file, csv_writer | |
async def main(): | |
# Define the query strings for all messages | |
query_strings = [ | |
"?path=mavlink/vehicles/1/components/1/messages/ATTITUDE", | |
"?path=mavlink/vehicles/1/components/1/messages/LOCAL_POSITION_NED", | |
"?path=v1/mavlink/vehicles/1/components/1/messages/RC_CHANNELS" | |
] | |
# Parse all query strings | |
paths_info = [parse_query_string(qs) for qs in query_strings] | |
# Define the host and endpoints | |
WSAPI = "ws://192.168.2.2:6040" # Adjust this as needed | |
websocket_url = f"{WSAPI}/v1/ws/mavlink" | |
# Prepare HTTP URLs for initial data fetch | |
http_urls = [f"http://192.168.2.2:6040/{info['path']}" for info in paths_info] | |
# Initialize CSV file | |
csv_file_path = "mavlink_messages_free_style.csv" | |
fieldnames = ["timestamp", "message_type", "message_data"] | |
csv_file, csv_writer = await initialize_csv(csv_file_path, fieldnames) | |
try: | |
async with aiohttp.ClientSession() as http_session: | |
# Create tasks to fetch all initial data concurrently | |
fetch_tasks = [fetch_initial_data(http_session, url) for url in http_urls] | |
initial_messages = await asyncio.gather(*fetch_tasks, return_exceptions=True) | |
for idx, message in enumerate(initial_messages): | |
if isinstance(message, Exception): | |
print(f"Error fetching initial data for path {paths_info[idx]['path']}: {message}") | |
else: | |
print(f"Initial message from HTTP fetch for {paths_info[idx]['messageName']}:") | |
print(json.dumps(message, indent=2)) | |
print("-" * 50) | |
# Write initial message to CSV | |
await write_to_csv(csv_writer, paths_info[idx]['messageName'], message) | |
finally: | |
await csv_file.flush() | |
# Connect to the WebSocket | |
async with websockets.connect(websocket_url) as websocket: | |
print("[open] WebSocket connection established.") | |
while True: | |
try: | |
event_data = await websocket.recv() | |
except websockets.ConnectionClosed: | |
print("[close] WebSocket connection closed.") | |
break | |
try: | |
jsonData = json.loads(event_data) | |
except json.JSONDecodeError: | |
print("[error] Received non-JSON data from WebSocket.") | |
continue | |
hdr = jsonData.get("header", {}) | |
msg = jsonData.get("message", {}) | |
# Iterate through all path infos to check if the message matches any criteria | |
for info in paths_info: | |
vehicleId = info["vehicleId"] | |
componentId = info["componentId"] | |
messageName = info["messageName"] | |
if vehicleId is not None and hdr.get("system_id") != vehicleId: | |
continue | |
if componentId is not None and hdr.get("component_id") != componentId: | |
continue | |
if messageName is not None and msg.get("type") != messageName: | |
continue | |
# If we get here, it's a message we care about. | |
filtered_message = msg if msg else jsonData | |
# print(f"Filtered message from WebSocket for {messageName}:") | |
# print(json.dumps(filtered_message, indent=2)) | |
# print("-" * 50) | |
# Write the filtered message to CSV | |
await write_to_csv(csv_writer, messageName, filtered_message) | |
# Close the CSV file when done | |
await csv_file.close() | |
if __name__ == "__main__": | |
try: | |
asyncio.run(main()) | |
except KeyboardInterrupt: | |
print("\n[exit] Script terminated by user.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment