Advertisement
Guest User

code for stackoverflow question

a guest
Sep 26th, 2012
612
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 5.90 KB | None | 0 0
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3.  
  4. import sys
  5. import os
  6. import select
  7. import socket
  8. import argparse
  9. import struct
  10. import time
  11.  
  12.  
  13. class EthernetTest(object):
  14.  
  15.     def __init__(self):
  16.         self._host = None
  17.         self._port = None
  18.         self._socket = None
  19.         self._tcpfps = 1
  20.         self._fptcp = 1
  21.         self._dur = 1
  22.         self._poll = None
  23.         self._buffer = ''
  24.         self._send_buffer = ''
  25.         self._start_time = None
  26.         self._time = None
  27.         self._acc = 0.0
  28.         self._finish_time = 0
  29.         self._out_tcp_frames = 0
  30.         self._in_tcp_frames = 0
  31.         self._out_can_frames = 0
  32.         self._in_can_frames = 0
  33.  
  34.         self.parse_args()
  35.         self.connect()
  36.         self.send_handshake()
  37.         self.get_parameters()
  38.  
  39.     def parse_args(self):
  40.         parser = argparse.ArgumentParser(
  41.             description='Ethernet test'
  42.         )
  43.         parser.add_argument(
  44.             '--host',
  45.             dest='host',
  46.             action='store',
  47.             default='192.168.2.160',
  48.             help='Host address',
  49.         )
  50.         parser.add_argument(
  51.             '--port',
  52.             dest='port',
  53.             action='store',
  54.             default='1204',
  55.             help='Port',
  56.         )
  57.         args = parser.parse_args()
  58.         self._host = args.host
  59.         self._port = args.port
  60.  
  61.     def connect(self):
  62.         try:
  63.             self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  64.         except socket.error, msg:
  65.             sys.stderr.write("[ERROR] %s\n" % msg[1])
  66.             sys.exit(1)
  67.         try:
  68.             self._socket.connect((self._host, int(self._port)))
  69.         except socket.error, msg:
  70.             sys.stderr.write("[ERROR] %s\n" % msg[1])
  71.             sys.exit(2)
  72.         self._socket.setblocking(0)
  73.         self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
  74.  
  75.     def send_handshake(self):
  76.         print u'First frame with info.'
  77.         data = struct.pack('>BBBBBBB', 1, 0, 0, 0, 0, 0, 0)
  78.         number = raw_input(u'Input number of filter frames or press enter to set default data (0701AAAA1):')
  79.         if number != "":
  80.             data += struct.pack('>B', int(number))
  81.             for nr in range(int(number)):
  82.                 fid = raw_input(u'Input frame ID:')
  83.                 ch = raw_input(u'Input channel number:')
  84.                 print fid[0:4]
  85.                 print fid[4:8]
  86.                 data += struct.pack('>HHB', int(fid[0:4], 16), int(fid[4:8], 16), int(ch))
  87.         else:
  88.             data += struct.pack('>B', 1)
  89.             data += struct.pack('>HHB', int('0701', 16), int('AAAA', 16), 1)
  90.         self._socket.send(data)
  91.  
  92.     def get_parameters(self):
  93.         self._tcpfps = float(raw_input(u'Input number of TCP frames per second (float)'))
  94.         self._fptcp = int(raw_input(u'Input number of CAN frames per one TCP frame (int)'))
  95.         self._dur = float(raw_input(u'Input time duration of test (seconds)'))
  96.         self._finish_time = time.time() + self._dur
  97.         self.test_loop()
  98.  
  99.     def test_loop(self):
  100.         self._start_time = time.time()
  101.         self._time = time.time()
  102.         self._acc = 1.0 / self._tcpfps
  103.         while True:
  104.             if self._send_buffer:
  105.                 bytes = self._socket.send(self._send_buffer)
  106.                 self._send_buffer = self._send_buffer[bytes:]
  107.             diff = time.time() - self._time
  108.             self._acc += diff
  109.             self._time = time.time()
  110.             self.read()
  111.             while self._acc >= 1.0 / self._tcpfps:
  112.                 self._acc -= 1.0 / self._tcpfps
  113.                 data = struct.pack('>BBBBBBBB', 8, 0, 0, 0, 0, 0, 0, self._fptcp)
  114.                 for nr in range(self._fptcp):
  115.                     data += struct.pack('>IBBBBBBBBB', int('0701BBBB', 16), 8, 0, 0, 0, 0, 0, 0, 0, 0)
  116.                 bytes = self._socket.send(data)
  117.                 self._send_buffer += data[bytes:]
  118.                 self._out_tcp_frames += 1
  119.                 self._out_can_frames += self._fptcp
  120.                 self.status()
  121.             if self._time >= self._finish_time:
  122.                 print u'Test finished'
  123.                 raw_input(u'Press enter to close')
  124.                 break
  125.         sys.exit()
  126.  
  127.     def read(self):
  128.         try:
  129.             data = self._socket.recv(128)
  130.             self._buffer += data
  131.         except:
  132.             pass
  133.         if len(self._buffer) >= 8:
  134.             count = struct.unpack('>B', self._buffer[7])[0]
  135.             if len(self._buffer) >= 8 + (count * 13):
  136.                 self._in_tcp_frames += 1
  137.                 self._in_can_frames += count
  138.                 frame = self._buffer[:(8 + (count * 13))]
  139.                 frame_hex = ''
  140.                 for byte in frame[0:8]:
  141.                     frame_hex += str(struct.unpack('>B', byte)[0])
  142.                 frame = frame[8:]
  143.                 for x in range(count):
  144.                     frame_hex += ' '
  145.                     for byte in frame[0:4]:
  146.                         frame_hex += str(hex(struct.unpack('>B', byte)[0]))[2:]
  147.                     frame = frame[4:]
  148.                     for byte in frame[0:9]:
  149.                         frame_hex += str(struct.unpack('>B', byte)[0])
  150.                     frame = frame[9:]
  151.                 print frame_hex
  152.                 self._buffer = self._buffer[(8 + (count * 13)):]
  153.  
  154.     def status(self):
  155.         diff = self._time - self._start_time
  156.         f1 = self._out_tcp_frames / diff
  157.         f2 = self._out_can_frames / diff
  158.         f3 = self._in_tcp_frames / diff
  159.         f4 = self._in_can_frames / diff
  160.         print u'Sent frames TCP: %s. %s/s' % (str(self._out_tcp_frames), str(f1))
  161.         print u'Sent frames CAN: %s. %s/s' % (str(self._out_can_frames), str(f2))
  162.         print u'Read frames TCP: %s. %s/s' % (str(self._in_tcp_frames), str(f3))
  163.         print u'Read frames CAN: %s. %s/s' % (str(self._in_can_frames), str(f4))
  164.  
  165.  
  166. EthernetTest()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement