Skip to content

Instantly share code, notes, and snippets.

@anecdata
Created November 5, 2023 01:31
Show Gist options
  • Save anecdata/c3cccddb54037fed2928a7c18fa05a8e to your computer and use it in GitHub Desktop.
Save anecdata/c3cccddb54037fed2928a7c18fa05a8e to your computer and use it in GitHub Desktop.
CAN Bus Sender & Receiver test code
import struct
import time
import board
import busio
import digitalio
try:
import canio
NATIVE = True
except ImportError:
NATIVE = False
import adafruit_mcp2515.canio as canio
from adafruit_mcp2515 import MCP2515
#
# CAN LISTENER
#
DEBUG = True
RCVRID = 0x100
MSGTIMEOUT = 5
def can_recv_msg():
while True:
msg = listener.receive()
if (can_bus.transmit_error_count > 0) or (can_bus.receive_error_count > 0):
print(f"πŸ”΄ MSG tx_err={can_bus.transmit_error_count} rx_err={can_bus.receive_error_count}")
if msg is None:
if DEBUG: print("🟑 MSG not received within timeout")
continue
if DEBUG: print(f"MSG {msg.data} from={hex(msg.id)}")
if isinstance(msg, canio.Message):
break
else:
if DEBUG: print("🟑 not a canio message")
return msg
time.sleep(3) # wait for serial
print(f"{'='*25}")
# assume matched hardware
if NATIVE:
can_bus = canio.CAN(rx=board.RX, tx=board.TX, baudrate=1_000_000, auto_restart=True)
else:
cs = digitalio.DigitalInOut(board.D5)
cs.switch_to_output()
spi = board.SPI()
can_bus = MCP2515(spi, cs, baudrate=500_000)
listener = can_bus.listen(timeout=MSGTIMEOUT)
old_bus_state = None
while True:
bus_state = can_bus.state
if bus_state != old_bus_state:
print(f"🟣 BUS state changed to {bus_state}")
old_bus_state = bus_state
print(f"Receiving...")
if DEBUG: print(f"MSG avail={listener.in_waiting()} unread={can_bus.unread_message_count}")
msg = can_recv_msg()
print(f"{'-'*25}")
@anecdata
Copy link
Author

anecdata commented Nov 5, 2023

import random
import math
import struct
import time
import board
import busio
import digitalio
try:
    import canio
    NATIVE = True
except ImportError:
    NATIVE = False
    import adafruit_mcp2515.canio as canio
    from adafruit_mcp2515 import MCP2515


#
# CAN SENDER
#
DEBUG = False
SENDERID = 0x400


def can_send_msg(data):
    message = canio.Message(id=SENDERID, data=data)
    can_bus.send(message)
    if (can_bus.transmit_error_count > 0) or (can_bus.receive_error_count > 0): 
        print(f"πŸ”΄ MSG tx_err={can_bus.transmit_error_count} rx_err={can_bus.receive_error_count}")
    print(f"MSG {data} len={len(data)}")
    return


time.sleep(3)  # wait for serial
print(f"{'='*25}")

# assume matched hardware
if NATIVE:
    can_bus = canio.CAN(rx=board.RX, tx=board.TX, baudrate=1_000_000, auto_restart=True)
else:
    cs = digitalio.DigitalInOut(board.D5)
    cs.switch_to_output()
    spi = board.SPI()
    can_bus = MCP2515(spi, cs, baudrate=500_000)

old_bus_state = None
while True:
    bus_state = can_bus.state
    if bus_state != old_bus_state:
        print(f"🟣 BUS state changed to {bus_state}")
        old_bus_state = bus_state

    print(f"Sending...")
    data = "".join(random.choice("0123456789ABCDEF") for i in range(random.randint(1, 6))).encode()
    can_send_msg(data)

    print(f"{'-'*25}")
    time.sleep(0.1)

@anecdata
Copy link
Author

anecdata commented Nov 5, 2023

@anecdata
Copy link
Author

anecdata commented Nov 5, 2023

CAN_MCP2515

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment