Last active
January 10, 2022 15:53
-
-
Save wolkenarchitekt/d7bcbe9be570f81ec38a67d46e412cbf to your computer and use it in GitHub Desktop.
Read JSON stream from FLARM Atom UAV via serial
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Prerequisites: | |
# * Install pyserial-asyncio==0.6 | |
# * Add udev-rule for Flarm ATOM UAV, so it will show up as symlink '/dev/ttyUSB-ATOM': | |
# ACTION=="add", SUBSYSTEM=="tty", ATTRS{idVendor}=="10c4", ATTRS{idProduct}=="ea60", SYMLINK+="ttyUSB-ATOM" | |
# | |
import asyncio | |
import json | |
import logging | |
from typing import Generator | |
from serial_asyncio import open_serial_connection | |
logger = logging.getLogger(__name__) | |
async def read_serial_to_json(device: str, baudrate=115200) -> Generator[dict, None, None]: | |
reader, writer = await open_serial_connection(url=device, baudrate=baudrate) | |
while True: | |
line = await reader.readline() | |
line_str = line.decode().strip() | |
data = json.loads(line_str) | |
yield data | |
async def serial_print(device: str): | |
async for data in read_serial_to_json(device=device): | |
logger.debug(f"Received data: {data}") | |
# Process JSON dictionary here | |
if __name__ == "__main__": | |
logging.basicConfig(level=logging.DEBUG) | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(serial_print(device="/dev/ttyUSB-ATOM")) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment