Skip to content

Instantly share code, notes, and snippets.

@azlkiniue
Last active September 22, 2024 15:25
Show Gist options
  • Save azlkiniue/df7966ab27fcc1e953634d07b279af4d to your computer and use it in GitHub Desktop.
Save azlkiniue/df7966ab27fcc1e953634d07b279af4d to your computer and use it in GitHub Desktop.
Script to forward CAN bus logs (using SocketCAN) into websocket
#!/usr/bin/env python
import can
import asyncio
from typing import List
from can.notifier import MessageRecipient
from websockets.asyncio.server import broadcast, serve
connections = set()
interval = 0.1 # in seconds
device = 'can0'
async def handler(websocket):
connections.add(websocket)
try:
await websocket.wait_closed()
finally:
connections.remove(websocket)
async def broadcast_can():
with can.Bus(channel=device, interface='socketcan') as bus:
reader = can.AsyncBufferedReader()
listeners: List[MessageRecipient] = [
reader, # AsyncBufferedReader() listener
]
loop = asyncio.get_running_loop()
notifier = can.Notifier(bus, listeners, loop=loop)
async for msg in reader:
if msg:
# ref: https://github.com/hardbyte/python-can/blob/main/can/message.py
field_strings = [f"({msg.timestamp:>15.6f})"]
data_strings = msg.data[: min(msg.dlc, len(msg.data))].hex().upper()
field_strings.append(f"{msg.channel}")
if msg.is_extended_id:
arbitration_id_string = f"{msg.arbitration_id:08X}"
else:
arbitration_id_string = f"{msg.arbitration_id:03X}"
field_strings.append(f"{arbitration_id_string:>3}#{data_strings.ljust(24, ' ')}")
output = " ".join(field_strings).strip()
broadcast(connections, output)
notifier.stop()
async def broadcast_messages():
print("WebSocket server started on port 8765")
while True:
# await asyncio.sleep(interval)
await broadcast_can()
async def main():
async with serve(handler, "0.0.0.0", 8765):
await broadcast_messages() # run forever
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment