Created
September 3, 2015 02:33
-
-
Save eliotb/5b647e5498831710f551 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
General CAN functions using Kvaser canlib | |
""" | |
from __future__ import print_function | |
import logging | |
from sys import exit | |
# kvaser canlib | |
from canlib import * | |
def setup(port_id, bitrate=canBITRATE_250K): | |
"""Setup kvaser CAN given port_id which may be device channel or serial number""" | |
cl = canlib() | |
channels = cl.getNumberOfChannels() | |
logging.info("Kvaser canlib version: %s, %d channels available" % | |
(cl.getVersion(), channels)) | |
chans = {} | |
for ch in range(0, channels): | |
try: | |
info = (ch, | |
int(cl.getChannelData_Serial(ch)), | |
cl.getChannelData_Name(ch), | |
cl.getChannelData_EAN(ch)) | |
chans[ch] = info # key on channel number | |
if info[1] >= channels: | |
chans[info[1]] = info # key on serial number | |
except (canError) as ex: | |
print(ex) | |
try: | |
ch = chans[port_id][0] | |
except KeyError: | |
print('A valid port number is required for --port option:') | |
for k in chans.items(): | |
print('%d: %s' % k) | |
print() | |
exit() | |
chan = cl.openChannel(ch, canOPEN_ACCEPT_VIRTUAL | canOPEN_REQUIRE_EXTENDED) | |
chan.ioCtl_set_timer_scale(100) # microseconds per tick | |
chan.timer_scale = 100 | |
chan.setBusOutputControl(canDRIVER_NORMAL) | |
chan.setBusParams(bitrate) | |
chan.busOn() | |
return chan, cl | |
error_flags = (canMSG_ERROR_FRAME | canMSG_NERR | | |
canMSGERR_HW_OVERRUN | canMSGERR_SW_OVERRUN) | |
def flags_str(flags): | |
fl = [] | |
if flags & canMSG_RTR: # 1 Message is a remote request | |
fl.append('RTR|') | |
if flags & canMSG_STD: #2 Message has a standard (11-bit) identifier | |
fl.append('STD') | |
if flags & canMSG_EXT: #4 Message has a extended (29-bit) identifier | |
fl.append('EXT') | |
if flags & canMSG_WAKEUP: #8 Message is a WAKEUP message (SWC hardware.) | |
fl.append('WAKE') | |
if flags & canMSG_ERROR_FRAME: #32 Message represents an error frame. | |
fl.append('ERR') | |
#The following flags can be returned from canRead() et al, but cannot be passed to canWrite(): | |
if flags & canMSG_NERR: # 16 NERR was active during the message (TJA1054 etc. hardware. See Note 4 below.)/tr> | |
fl.append('NERR') | |
if flags & canMSG_TXACK : # 64 Message is a TX ACK (meaning that the message was really sent)/tr> | |
fl.append('TXACK') | |
if flags & canMSG_TXRQ: # 128 Message is a TX REQ (meaning that the message was transferred to the CAN controller)/tr> | |
fl.append('TXRQ') | |
if flags & canMSGERR_HW_OVERRUN: # 512 Hardware buffer overrun. | |
fl.append('HWOVR') | |
if flags & canMSGERR_SW_OVERRUN: # 1024 Software buffer overrun. | |
fl.append('SWOVR') | |
return '|'.join(fl) | |
def add_options(p): | |
'''Add CAN related options to an optparse instance''' | |
p.add_option('-b', '--bus', type='int', default=1, | |
help='CAN bus index 1 or 2. Default=%default') | |
p.add_option('-p', '--port', type='int', default=0, | |
help='CAN channel #, typically 0 or 1, or device serial') | |
p.add_option('-F', '--fake', action='store_true', | |
help='Send packets direct to localhost MQTT broker instead of serial port') | |
if __name__ == '__main__': | |
from collections import defaultdict | |
from optparse import OptionParser | |
import random | |
import time | |
def parse_options(): | |
p = OptionParser() | |
add_options(p) | |
p.add_option('-l', '--loglevel', type='int', default=3, | |
help='Logging level. Default=%default') | |
opts, args = p.parse_args() | |
level = {1: logging.ERROR, 2: logging.WARNING, 3: logging.INFO, | |
4: logging.DEBUG}.get(opts.loglevel, logging.DEBUG) | |
opts.loglevel = level | |
return opts, args | |
opts, args = parse_options() | |
logging.basicConfig(level=opts.loglevel) | |
busname = 'CAN-%d' % opts.bus | |
ch, lib = setup(opts.port) | |
num_messages = 100 | |
if args[0].startswith('w'): # | |
max_std = 2047 | |
logging.info('Generating %d packets to %s (port#%d)' % (num_messages, ch.getChannelData_Name(), opts.port)) | |
sent = [] | |
for i in range(num_messages): | |
msg = [] | |
for m in range(random.randint(1, 8)): | |
msg.append(random.randint(0, 255)) | |
id = random.randint(0, max_std) | |
sent.append(id) | |
print(id, msg) | |
ch.write(id, msg) | |
print(sent) | |
else: | |
msgs_received = defaultdict(list) | |
logging.info('Receiving %d packets from %s (port#%d)' % (num_messages, ch.getChannelData_Name(), opts.port)) | |
for i in range(num_messages): | |
try: | |
msg = ch.read(5000) # Ohhh, timeout seems to be in milliseconds, should be float seconds! | |
except canNoMsg as e: | |
logging.warning('No messages received for 5 seconds') | |
continue | |
except Exception as e: | |
logging.error(str(e)) | |
continue | |
# print("%d rx id: %08X msg: %s" % (i, id, [x for x in msg])) | |
msgs_received[msg[0]].append(msg) | |
ml = msgs_received.items() | |
ml.sort() | |
for k, v in ml: | |
print('Rx ID: %08X' % k) | |
for msg in v: | |
print("dlc: %08X msg: %s" % (msg[2], [x for x in msg[1]])) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment