Skip to content

Instantly share code, notes, and snippets.

@edxmorgan
Created December 17, 2024 20:18
Show Gist options
  • Save edxmorgan/7c2086a370540b087561044509c49934 to your computer and use it in GitHub Desktop.
Save edxmorgan/7c2086a370540b087561044509c49934 to your computer and use it in GitHub Desktop.
import asyncio
import aiohttp
import json
import sys
import signal
async def get_vision_position_delta(session, url):
try:
async with session.get(url) as response:
response.raise_for_status() # Raise exception for HTTP errors
# Attempt to parse the response as JSON
try:
data = await response.json()
print("VISION_POSITION_DELTA Data:")
print(json.dumps(data, indent=4))
except aiohttp.ContentTypeError:
# If response is not JSON, handle accordingly
text = await response.text()
print("Received non-JSON response:")
print(text)
except aiohttp.ClientResponseError as http_err:
print(f"HTTP error occurred: {http_err.status} {http_err.message}", file=sys.stderr)
except aiohttp.ClientConnectionError as conn_err:
print(f"Connection error occurred: {conn_err}", file=sys.stderr)
except asyncio.TimeoutError:
print("Timeout error occurred", file=sys.stderr)
except aiohttp.ClientError as req_err:
print(f"An error occurred: {req_err}", file=sys.stderr)
async def continuous_fetch(url, interval):
timeout = aiohttp.ClientTimeout(total=10) # 10 seconds timeout
async with aiohttp.ClientSession(timeout=timeout) as session:
while True:
await get_vision_position_delta(session, url)
await asyncio.sleep(interval)
def main():
url = "http://192.168.2.2:6040/v1/mavlink/vehicles/255/components/0/messages/VISION_POSITION_DELTA"
fetch_interval = 0.005 # Interval in seconds between requests
loop = asyncio.get_event_loop()
# Handle graceful shutdown on SIGINT and SIGTERM
for signame in ('SIGINT', 'SIGTERM'):
loop.add_signal_handler(getattr(signal, signame), loop.stop)
try:
print(f"Starting continuous data fetch every {fetch_interval} seconds...")
loop.create_task(continuous_fetch(url, fetch_interval))
loop.run_forever()
except KeyboardInterrupt:
print("\nReceived exit signal. Shutting down...")
finally:
loop.close()
print("Shutdown complete.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment