Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- """
- General CAN functions using Kvaser canlib
- """
- from __future__ import print_function
- import logging
- from sys import exit
- # kvaser canlib
- from canlib import *
- def setup(port_id, bitrate=canBITRATE_250K):
- """Setup kvaser CAN given port_id which may be device channel or serial number"""
- cl = canlib()
- channels = cl.getNumberOfChannels()
- logging.info("Kvaser canlib version: %s, %d channels available" %
- (cl.getVersion(), channels))
- chans = {}
- for ch in range(0, channels):
- try:
- info = (ch,
- int(cl.getChannelData_Serial(ch)),
- cl.getChannelData_Name(ch),
- cl.getChannelData_EAN(ch))
- chans[ch] = info # key on channel number
- if info[1] >= channels:
- chans[info[1]] = info # key on serial number
- except (canError) as ex:
- print(ex)
- try:
- ch = chans[port_id][0]
- except KeyError:
- print('A valid port number is required for --port option:')
- for k in chans.items():
- print('%d: %s' % k)
- print()
- exit()
- chan = cl.openChannel(ch, canOPEN_ACCEPT_VIRTUAL | canOPEN_REQUIRE_EXTENDED)
- chan.ioCtl_set_timer_scale(100) # microseconds per tick
- chan.timer_scale = 100
- chan.setBusOutputControl(canDRIVER_NORMAL)
- chan.setBusParams(bitrate)
- chan.busOn()
- return chan, cl
- error_flags = (canMSG_ERROR_FRAME | canMSG_NERR |
- canMSGERR_HW_OVERRUN | canMSGERR_SW_OVERRUN)
- def flags_str(flags):
- fl = []
- if flags & canMSG_RTR: # 1 Message is a remote request
- fl.append('RTR|')
- if flags & canMSG_STD: #2 Message has a standard (11-bit) identifier
- fl.append('STD')
- if flags & canMSG_EXT: #4 Message has a extended (29-bit) identifier
- fl.append('EXT')
- if flags & canMSG_WAKEUP: #8 Message is a WAKEUP message (SWC hardware.)
- fl.append('WAKE')
- if flags & canMSG_ERROR_FRAME: #32 Message represents an error frame.
- fl.append('ERR')
- #The following flags can be returned from canRead() et al, but cannot be passed to canWrite():
- if flags & canMSG_NERR: # 16 NERR was active during the message (TJA1054 etc. hardware. See Note 4 below.)/tr>
- fl.append('NERR')
- if flags & canMSG_TXACK : # 64 Message is a TX ACK (meaning that the message was really sent)/tr>
- fl.append('TXACK')
- if flags & canMSG_TXRQ: # 128 Message is a TX REQ (meaning that the message was transferred to the CAN controller)/tr>
- fl.append('TXRQ')
- if flags & canMSGERR_HW_OVERRUN: # 512 Hardware buffer overrun.
- fl.append('HWOVR')
- if flags & canMSGERR_SW_OVERRUN: # 1024 Software buffer overrun.
- fl.append('SWOVR')
- return '|'.join(fl)
- def add_options(p):
- '''Add CAN related options to an optparse instance'''
- p.add_option('-b', '--bus', type='int', default=1,
- help='CAN bus index 1 or 2. Default=%default')
- p.add_option('-p', '--port', type='int', default=0,
- help='CAN channel #, typically 0 or 1, or device serial')
- p.add_option('-F', '--fake', action='store_true',
- help='Send packets direct to localhost MQTT broker instead of serial port')
- if __name__ == '__main__':
- from collections import defaultdict
- from optparse import OptionParser
- import random
- import time
- def parse_options():
- p = OptionParser()
- add_options(p)
- p.add_option('-l', '--loglevel', type='int', default=3,
- help='Logging level. Default=%default')
- opts, args = p.parse_args()
- level = {1: logging.ERROR, 2: logging.WARNING, 3: logging.INFO,
- 4: logging.DEBUG}.get(opts.loglevel, logging.DEBUG)
- opts.loglevel = level
- return opts, args
- opts, args = parse_options()
- logging.basicConfig(level=opts.loglevel)
- busname = 'CAN-%d' % opts.bus
- ch, lib = setup(opts.port)
- num_messages = 100
- if args[0].startswith('w'): #
- max_std = 2047
- logging.info('Generating %d packets to %s (port#%d)' % (num_messages, ch.getChannelData_Name(), opts.port))
- sent = []
- for i in range(num_messages):
- msg = []
- for m in range(random.randint(1, 8)):
- msg.append(random.randint(0, 255))
- id = random.randint(0, max_std)
- sent.append(id)
- print(id, msg)
- ch.write(id, msg)
- print(sent)
- else:
- msgs_received = defaultdict(list)
- logging.info('Receiving %d packets from %s (port#%d)' % (num_messages, ch.getChannelData_Name(), opts.port))
- for i in range(num_messages):
- try:
- msg = ch.read(5000) # Ohhh, timeout seems to be in milliseconds, should be float seconds!
- except canNoMsg as e:
- logging.warning('No messages received for 5 seconds')
- continue
- except Exception as e:
- logging.error(str(e))
- continue
- # print("%d rx id: %08X msg: %s" % (i, id, [x for x in msg]))
- msgs_received[msg[0]].append(msg)
- ml = msgs_received.items()
- ml.sort()
- for k, v in ml:
- print('Rx ID: %08X' % k)
- for msg in v:
- print("dlc: %08X msg: %s" % (msg[2], [x for x in msg[1]]))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement