Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import sys
- import os
- import select
- import socket
- import argparse
- import struct
- import time
- class EthernetTest(object):
- def __init__(self):
- self._host = None
- self._port = None
- self._socket = None
- self._tcpfps = 1
- self._fptcp = 1
- self._dur = 1
- self._poll = None
- self._buffer = ''
- self._send_buffer = ''
- self._start_time = None
- self._time = None
- self._acc = 0.0
- self._finish_time = 0
- self._out_tcp_frames = 0
- self._in_tcp_frames = 0
- self._out_can_frames = 0
- self._in_can_frames = 0
- self.parse_args()
- self.connect()
- self.send_handshake()
- self.get_parameters()
- def parse_args(self):
- parser = argparse.ArgumentParser(
- description='Ethernet test'
- )
- parser.add_argument(
- '--host',
- dest='host',
- action='store',
- default='192.168.2.160',
- help='Host address',
- )
- parser.add_argument(
- '--port',
- dest='port',
- action='store',
- default='1204',
- help='Port',
- )
- args = parser.parse_args()
- self._host = args.host
- self._port = args.port
- def connect(self):
- try:
- self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- except socket.error, msg:
- sys.stderr.write("[ERROR] %s\n" % msg[1])
- sys.exit(1)
- try:
- self._socket.connect((self._host, int(self._port)))
- except socket.error, msg:
- sys.stderr.write("[ERROR] %s\n" % msg[1])
- sys.exit(2)
- self._socket.setblocking(0)
- self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
- def send_handshake(self):
- print u'First frame with info.'
- data = struct.pack('>BBBBBBB', 1, 0, 0, 0, 0, 0, 0)
- number = raw_input(u'Input number of filter frames or press enter to set default data (0701AAAA1):')
- if number != "":
- data += struct.pack('>B', int(number))
- for nr in range(int(number)):
- fid = raw_input(u'Input frame ID:')
- ch = raw_input(u'Input channel number:')
- print fid[0:4]
- print fid[4:8]
- data += struct.pack('>HHB', int(fid[0:4], 16), int(fid[4:8], 16), int(ch))
- else:
- data += struct.pack('>B', 1)
- data += struct.pack('>HHB', int('0701', 16), int('AAAA', 16), 1)
- self._socket.send(data)
- def get_parameters(self):
- self._tcpfps = float(raw_input(u'Input number of TCP frames per second (float)'))
- self._fptcp = int(raw_input(u'Input number of CAN frames per one TCP frame (int)'))
- self._dur = float(raw_input(u'Input time duration of test (seconds)'))
- self._finish_time = time.time() + self._dur
- self.test_loop()
- def test_loop(self):
- self._start_time = time.time()
- self._time = time.time()
- self._acc = 1.0 / self._tcpfps
- while True:
- if self._send_buffer:
- bytes = self._socket.send(self._send_buffer)
- self._send_buffer = self._send_buffer[bytes:]
- diff = time.time() - self._time
- self._acc += diff
- self._time = time.time()
- self.read()
- while self._acc >= 1.0 / self._tcpfps:
- self._acc -= 1.0 / self._tcpfps
- data = struct.pack('>BBBBBBBB', 8, 0, 0, 0, 0, 0, 0, self._fptcp)
- for nr in range(self._fptcp):
- data += struct.pack('>IBBBBBBBBB', int('0701BBBB', 16), 8, 0, 0, 0, 0, 0, 0, 0, 0)
- bytes = self._socket.send(data)
- self._send_buffer += data[bytes:]
- self._out_tcp_frames += 1
- self._out_can_frames += self._fptcp
- self.status()
- if self._time >= self._finish_time:
- print u'Test finished'
- raw_input(u'Press enter to close')
- break
- sys.exit()
- def read(self):
- try:
- data = self._socket.recv(128)
- self._buffer += data
- except:
- pass
- if len(self._buffer) >= 8:
- count = struct.unpack('>B', self._buffer[7])[0]
- if len(self._buffer) >= 8 + (count * 13):
- self._in_tcp_frames += 1
- self._in_can_frames += count
- frame = self._buffer[:(8 + (count * 13))]
- frame_hex = ''
- for byte in frame[0:8]:
- frame_hex += str(struct.unpack('>B', byte)[0])
- frame = frame[8:]
- for x in range(count):
- frame_hex += ' '
- for byte in frame[0:4]:
- frame_hex += str(hex(struct.unpack('>B', byte)[0]))[2:]
- frame = frame[4:]
- for byte in frame[0:9]:
- frame_hex += str(struct.unpack('>B', byte)[0])
- frame = frame[9:]
- print frame_hex
- self._buffer = self._buffer[(8 + (count * 13)):]
- def status(self):
- diff = self._time - self._start_time
- f1 = self._out_tcp_frames / diff
- f2 = self._out_can_frames / diff
- f3 = self._in_tcp_frames / diff
- f4 = self._in_can_frames / diff
- print u'Sent frames TCP: %s. %s/s' % (str(self._out_tcp_frames), str(f1))
- print u'Sent frames CAN: %s. %s/s' % (str(self._out_can_frames), str(f2))
- print u'Read frames TCP: %s. %s/s' % (str(self._in_tcp_frames), str(f3))
- print u'Read frames CAN: %s. %s/s' % (str(self._in_can_frames), str(f4))
- EthernetTest()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement