Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import errno
- import functools
- import tornado.ioloop
- import socket
- import struct
- can_frame_fmt = "=IB3x8s"
- can_frame_size = struct.calcsize(can_frame_fmt)
- def build_can_frame(can_id, data):
- can_dlc = len(data)
- data = data.ljust(8, b'\x00')
- return struct.pack(can_frame_fmt, can_id, can_dlc, data)
- def dissect_can_frame(frame):
- can_id, can_dlc, data = struct.unpack(can_frame_fmt, frame)
- return (can_id, can_dlc, data[:can_dlc])
- def connection_ready(sock, fd, events):
- try:
- cf, addr = sock.recvfrom(can_frame_size)
- except BlockingIOError:
- self.logger.debug('Captured no data, socket in non-blocking mode.')
- return None
- except socket.timeout:
- self.logger.debug('Captured no data, socket read timed out.')
- return None
- dissect_can_frame(cf)
- if __name__ == '__main__':
- sock = socket.socket(socket.AF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
- sock.bind(('can0',))
- sock.setblocking(0)
- io_loop = tornado.ioloop.IOLoop.current()
- callback = functools.partial(connection_ready, sock)
- io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
- io_loop.start()
Add Comment
Please, Sign In to add comment