Created
December 19, 2020 15:16
-
-
Save ryang14/8d09939b9aa15716d4fbeca4fe982905 to your computer and use it in GitHub Desktop.
A canio based CAN node library
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import struct | |
import time | |
import board | |
import canio | |
import digitalio | |
# Create a node on a CAN network | |
# Uses the 29-bit extended message ID which contains both a node ID and topic ID | |
# Node ID is the source when sending from a node and destination when sending from the controller to a node | |
# This library assumes a network with one controller and multiple nodes | |
# | |
# Usage: | |
# from cannode import Node | |
# | |
# node = Node(42) | |
# | |
# while True: | |
# node.run() | |
class Node: | |
def __init__(self, node_id, rx=board.CAN_RX, tx=board.CAN_TX, timeout=.001): | |
self.node_id = node_id | |
self.callbacks = dict() | |
self.matches = [] | |
self.heartbeat_rate = 1 | |
self.__last_heartbeat = time.monotonic() | |
# Setup CAN stuff | |
# If the CAN transceiver has a standby pin, bring it out of standby mode | |
if hasattr(board, 'CAN_STANDBY'): | |
standby = digitalio.DigitalInOut(board.CAN_STANDBY) | |
standby.switch_to_output(False) | |
# If the CAN transceiver is powered by a boost converter, turn on its supply | |
if hasattr(board, 'BOOST_ENABLE'): | |
boost_enable = digitalio.DigitalInOut(board.BOOST_ENABLE) | |
boost_enable.switch_to_output(True) | |
# canio library objects | |
self.can = canio.CAN(rx=rx, tx=tx, | |
baudrate=250_000, auto_restart=True) | |
self.listener = self.can.listen(matches=self.matches, timeout=timeout) | |
# Add a heartbeat listener for debug | |
self.add_listener(0, self.handle_heartbeat) | |
# Handle heartbeat messages | |
# This is used for debug | |
def handle_heartbeat(self, message): | |
node_id, now_ms = struct.unpack("<II", message.data) | |
print(f"received heartbeat from node {node_id}: now_ms={now_ms}") | |
# Add a callback function that will be called when a message with the corresponding topic ID is received | |
# The message object will be passed to the function as a parameter | |
def add_listener(self, topic_id, callback): | |
# Listen to messages addressed to this node | |
message_id = int.from_bytes(struct.pack(">BH", self.node_id, topic_id), 'big') | |
self.matches.append(canio.Match(message_id, extended=True)) | |
# Also listen to broadcast messages | |
message_id = int.from_bytes(struct.pack(">BH", 255, topic_id), 'big') | |
self.matches.append(canio.Match(message_id, extended=True)) | |
self.callbacks[topic_id] = callback | |
# Restart the CAN bus listener with the added matches | |
self.listener.deinit() | |
self.listener = self.can.listen(matches=self.matches, timeout=.01) | |
# Listen for new messages and pass them to the corresponding callback | |
def listen(self): | |
if self.listener == None: | |
return | |
message = self.listener.receive() | |
if message is None: | |
#print("No messsage received within timeout") | |
return | |
data = message.data | |
if len(data) != 8: | |
print(f"Unusual message length {len(data)}") | |
return | |
node_id, topic_id = struct.unpack(">xBH", message.id.to_bytes(4, 'big')) | |
if topic_id in self.callbacks: | |
self.callbacks[topic_id](message) | |
# Send a message | |
def send(self, topic_id, data): | |
message_id = int.from_bytes(struct.pack(">BH", self.node_id, topic_id), 'big') | |
self.can.send(canio.Message(message_id, data, extended=True)) | |
# Send a heartbeat message | |
def heartbeat(self): | |
now_ms = (time.monotonic_ns() // 1_000_000) & 0xffffffff | |
self.send(0, struct.pack("<II", self.node_id, now_ms)) | |
def run(self): | |
self.listen() | |
if time.monotonic() - self.__last_heartbeat >= self.heartbeat_rate: | |
self.__last_heartbeat = time.monotonic() | |
self.heartbeat() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment