Advertisement
crackanddie

com port

May 21st, 2022
769
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import os
  2. import sys
  3. import time
  4. from threading import Thread
  5.  
  6. import serial
  7. from pycad import Logger
  8. from pycad import Shared
  9. from pycad.Shared import TitanStatic
  10. from pycad.Shared import VMXStatic
  11. from pycad.Funcad import Funcad
  12.  
  13.  
  14. class TitanCOM:
  15.     @classmethod
  16.     def start_com(cls) -> None:
  17.         th: Thread = Thread(target=cls.com_loop)
  18.         th.daemon = True
  19.         th.start()
  20.  
  21.     @classmethod
  22.     def com_loop(cls) -> None:
  23.         try:
  24.             ser = serial.Serial(
  25.                 port='/dev/ttyACM0',
  26.                 baudrate=115200,
  27.                 parity=serial.PARITY_NONE,
  28.                 stopbits=serial.STOPBITS_ONE,
  29.                 bytesize=serial.EIGHTBITS
  30.             )
  31.  
  32.             start_time: int = round(time.time() * 10000)
  33.             send_count_time: float = time.time()
  34.             comm_counter = 0
  35.             while True:
  36.                 rx_data: bytearray = bytearray(ser.read(48))
  37.                 ser.reset_input_buffer()  # reset buffer
  38.                 rx_time: int = round(time.time() * 10000)
  39.                 TitanCOM.set_up_rx_data(rx_data)
  40.                 Shared.info_holder.rx_com_time_dev = str(round(time.time() * 10000) - rx_time)
  41.  
  42.                 tx_time: int = round(time.time() * 10000)
  43.                 tx_data = TitanCOM.set_up_tx_data()
  44.                 Shared.info_holder.tx_com_time_dev = str(round(time.time() * 10000) - tx_time)
  45.                 ser.reset_output_buffer()  # reset buffer
  46.                 ser.write(tx_data)
  47.                 ser.flush()
  48.  
  49.                 comm_counter += 1
  50.                 if time.time() - send_count_time > 1:
  51.                     send_count_time = time.time()
  52.                     Shared.info_holder.com_count_dev = str(comm_counter)
  53.                     comm_counter = 0
  54.  
  55.                 time.sleep(0.002)
  56.                 Shared.info_holder.com_time_dev = str(round(time.time() * 10000) - start_time)
  57.                 start_time = round(time.time() * 10000)
  58.         except (Exception, serial.SerialException) as e:
  59.             exc_type, exc_obj, exc_tb = sys.exc_info()
  60.             file_name = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
  61.             Logger.write_com_log(" ".join(map(str, [exc_type, file_name, exc_tb.tb_lineno])))
  62.             Logger.write_com_log(str(e))
  63.  
  64.     @staticmethod
  65.     def set_up_rx_data(data: bytearray) -> None:
  66.         if data[42] != 33:
  67.             if data[0] == 1:
  68.                 if data[24] == 111:
  69.                     raw_enc_0: int = (data[2] & 0xff) << 8 | (data[1] & 0xff)
  70.                     raw_enc_1: int = (data[4] & 0xff) << 8 | (data[3] & 0xff)
  71.                     raw_enc_2: int = (data[6] & 0xff) << 8 | (data[5] & 0xff)
  72.                     raw_enc_3: int = (data[8] & 0xff) << 8 | (data[7] & 0xff)
  73.  
  74.                     TitanCOM.set_up_encoders(raw_enc_0, raw_enc_1, raw_enc_2, raw_enc_3)
  75.  
  76.                     # нужно поправить потом, красиво сделать
  77.                     TitanStatic.limit_1_m_2 = Funcad.access_bit(data[9], 5)
  78.                     TitanStatic.limit_l_m_3 = Funcad.access_bit(data[9], 4)
  79.                     TitanStatic.limit_h_m_3 = Funcad.access_bit(data[9], 6)
  80.  
  81.                     TitanStatic.limit_1_m_0 = Funcad.access_bit(data[10], 1)
  82.                     TitanStatic.limit_2_m_0 = Funcad.access_bit(data[10], 2)
  83.                     TitanStatic.limit_1_m_1 = Funcad.access_bit(data[10], 3)
  84.                     TitanStatic.limit_2_m_1 = Funcad.access_bit(data[10], 4)
  85.                     TitanStatic.limit_2_m_2 = Funcad.access_bit(data[10], 5)
  86.                     # достигнута ли позиция лифта
  87.                     TitanStatic.lift_pos_reached = Funcad.access_bit(data[10], 6)
  88.  
  89.                     sign_pid_0: int = 1 if Funcad.access_bit(data[19], 1) else -1
  90.                     sign_pid_1: int = 1 if Funcad.access_bit(data[19], 2) else -1
  91.                     sign_pid_2: int = 1 if Funcad.access_bit(data[19], 3) else -1
  92.                     sign_pid_3: int = 1 if Funcad.access_bit(data[19], 4) else -1
  93.                     sign_x: int = 1 if Funcad.access_bit(data[19], 5) else -1
  94.                     sign_y: int = 1 if Funcad.access_bit(data[19], 6) else -1
  95.  
  96.                     TitanStatic.speed_motor_0_pid = \
  97.                         ((data[12] & 0xff) << 8 | (data[11] & 0xff)) / 65535 * 100 * sign_pid_0
  98.                     TitanStatic.speed_motor_1_pid = \
  99.                         ((data[14] & 0xff) << 8 | (data[13] & 0xff)) / 65535 * 100 * sign_pid_1
  100.                     TitanStatic.speed_motor_2_pid = \
  101.                         ((data[16] & 0xff) << 8 | (data[15] & 0xff)) / 65535 * 100 * sign_pid_2
  102.                     TitanStatic.speed_motor_3_pid = \
  103.                         ((data[18] & 0xff) << 8 | (data[17] & 0xff)) / 65535 * 100 * sign_pid_3
  104.  
  105.                     TitanStatic.odo_x_pos = \
  106.                         ((data[21] & 0xff) << 8 | (data[20] & 0xff)) / 100 * sign_x
  107.                     TitanStatic.odo_y_pos = \
  108.                         ((data[23] & 0xff) << 8 | (data[22] & 0xff)) / 100 * sign_y
  109.         else:
  110.             Logger.write_com_log("received wrong data " + " ".join(map(str, data)))
  111.  
  112.     @staticmethod
  113.     def set_up_encoders(enc0: int, enc1: int, enc2: int, enc3: int) -> None:
  114.         TitanStatic.enc_motor_0 += TitanCOM.get_normal_diff(enc0, TitanStatic.raw_enc_motor_0)
  115.         TitanStatic.enc_motor_1 += TitanCOM.get_normal_diff(enc1, TitanStatic.raw_enc_motor_1)
  116.         TitanStatic.enc_motor_2 += TitanCOM.get_normal_diff(enc2, TitanStatic.raw_enc_motor_2)
  117.         TitanStatic.enc_motor_3 += TitanCOM.get_normal_diff(enc3, TitanStatic.raw_enc_motor_3)
  118.  
  119.         TitanStatic.raw_enc_motor_0 = enc0
  120.         TitanStatic.raw_enc_motor_1 = enc1
  121.         TitanStatic.raw_enc_motor_2 = enc2
  122.         TitanStatic.raw_enc_motor_3 = enc3
  123.  
  124.     @staticmethod
  125.     def get_normal_diff(curr: int, last: int) -> int:
  126.         diff: int = curr - last
  127.         if diff > 30000:
  128.             diff = -(last + (65535 - curr))
  129.         elif diff < -30000:
  130.             diff = curr + (65535 - last)
  131.         return diff
  132.  
  133.     @staticmethod
  134.     def set_up_tx_data() -> bytearray:
  135.         tx_data: bytearray = bytearray([0] * 48)
  136.         if TitanStatic.send_pid:
  137.             tx_data[0] = 2
  138.  
  139.             tx_data[1] = int('0' + ("1" if TitanStatic.use_pid_motor_0 else "0") +
  140.                              ("1" if TitanStatic.use_pid_motor_1 else "0") +
  141.                              ("1" if TitanStatic.use_pid_motor_2 else "0") +
  142.                              ("1" if TitanStatic.use_pid_motor_3 else "0") + '000', 2)
  143.  
  144.             motor_pids: bytearray = Funcad.int_to_byte_4(int(TitanStatic.pid_motor_0[0] * 1000000))
  145.             tx_data[2] = motor_pids[1]
  146.             tx_data[3] = motor_pids[2]
  147.             tx_data[4] = motor_pids[3]
  148.             motor_pids: bytearray = Funcad.int_to_byte_4(int(TitanStatic.pid_motor_0[1] * 1000000))
  149.             tx_data[5] = motor_pids[1]
  150.             tx_data[6] = motor_pids[2]
  151.             tx_data[7] = motor_pids[3]
  152.             motor_pids: bytearray = Funcad.int_to_byte_4(int(TitanStatic.pid_motor_0[2] * 1000000))
  153.             tx_data[8] = motor_pids[1]
  154.             tx_data[9] = motor_pids[2]
  155.             tx_data[10] = motor_pids[3]
  156.  
  157.             motor_pids: bytearray = Funcad.int_to_byte_4(int(TitanStatic.pid_motor_1[0] * 1000000))
  158.             tx_data[11] = motor_pids[1]
  159.             tx_data[12] = motor_pids[2]
  160.             tx_data[13] = motor_pids[3]
  161.             motor_pids: bytearray = Funcad.int_to_byte_4(int(TitanStatic.pid_motor_1[1] * 1000000))
  162.             tx_data[14] = motor_pids[1]
  163.             tx_data[15] = motor_pids[2]
  164.             tx_data[16] = motor_pids[3]
  165.             motor_pids: bytearray = Funcad.int_to_byte_4(int(TitanStatic.pid_motor_1[2] * 1000000))
  166.             tx_data[17] = motor_pids[1]
  167.             tx_data[18] = motor_pids[2]
  168.             tx_data[19] = motor_pids[3]
  169.  
  170.             motor_pids: bytearray = Funcad.int_to_byte_4(int(TitanStatic.pid_motor_2[0] * 1000000))
  171.             tx_data[20] = motor_pids[1]
  172.             tx_data[21] = motor_pids[2]
  173.             tx_data[22] = motor_pids[3]
  174.             motor_pids: bytearray = Funcad.int_to_byte_4(int(TitanStatic.pid_motor_2[1] * 1000000))
  175.             tx_data[23] = motor_pids[1]
  176.             tx_data[24] = motor_pids[2]
  177.             tx_data[25] = motor_pids[3]
  178.             motor_pids: bytearray = Funcad.int_to_byte_4(int(TitanStatic.pid_motor_2[2] * 1000000))
  179.             tx_data[26] = motor_pids[1]
  180.             tx_data[27] = motor_pids[2]
  181.             tx_data[28] = motor_pids[3]
  182.  
  183.             motor_pids: bytearray = Funcad.int_to_byte_4(int(TitanStatic.pid_motor_3[0] * 1000000))
  184.             tx_data[29] = motor_pids[1]
  185.             tx_data[30] = motor_pids[2]
  186.             tx_data[31] = motor_pids[3]
  187.             motor_pids: bytearray = Funcad.int_to_byte_4(int(TitanStatic.pid_motor_3[1] * 1000000))
  188.             tx_data[32] = motor_pids[1]
  189.             tx_data[33] = motor_pids[2]
  190.             tx_data[34] = motor_pids[3]
  191.             motor_pids: bytearray = Funcad.int_to_byte_4(int(TitanStatic.pid_motor_3[2] * 1000000))
  192.             tx_data[35] = motor_pids[1]
  193.             tx_data[36] = motor_pids[2]
  194.             tx_data[37] = motor_pids[3]
  195.  
  196.             tx_data[38] = 222
  197.  
  198.             TitanStatic.send_pid = False
  199.         else:
  200.             tx_data[0] = 1
  201.  
  202.             tx_data[1] = int('1' + '0000001', 2)
  203.  
  204.             motor_speeds: bytearray = Funcad.int_to_byte_4(abs(int(TitanStatic.speed_motor_0 / 100 * 65535)))
  205.             tx_data[2] = motor_speeds[2]
  206.             tx_data[3] = motor_speeds[3]
  207.  
  208.             motor_speeds: bytearray = Funcad.int_to_byte_4(abs(int(TitanStatic.speed_motor_1 / 100 * 65535)))
  209.             tx_data[4] = motor_speeds[2]
  210.             tx_data[5] = motor_speeds[3]
  211.  
  212.             motor_speeds: bytearray = Funcad.int_to_byte_4(abs(int(TitanStatic.speed_motor_2 / 100 * 65535)))
  213.             tx_data[6] = motor_speeds[2]
  214.             tx_data[7] = motor_speeds[3]
  215.  
  216.             motor_speeds: bytearray = Funcad.int_to_byte_4(abs(int(TitanStatic.speed_motor_3 / 100 * 65535)))
  217.             tx_data[8] = motor_speeds[2]
  218.             tx_data[9] = motor_speeds[3]
  219.  
  220.             tx_data[10] = int('1' + ("1" if TitanStatic.speed_motor_0 >= 0 else "0") +
  221.                               ("1" if TitanStatic.speed_motor_1 >= 0 else "0") +
  222.                               ("1" if TitanStatic.speed_motor_2 >= 0 else "0") +
  223.                               ("1" if TitanStatic.speed_motor_3 >= 0 else "0") + '001', 2)
  224.  
  225.             lift_poss: bytearray = Funcad.int_to_byte_4(abs(int(TitanStatic.lift_pos / 10 * 65535)))
  226.             tx_data[11] = lift_poss[2]
  227.             tx_data[12] = lift_poss[3]
  228.  
  229.             tx_data[13] = int('1' + ("1" if TitanStatic.virtual_ems else "0") +
  230.                               ("1" if TitanStatic.lift_init else "0") +
  231.                               ("1" if TitanStatic.odo_reset_coords else "0") +
  232.                               ("0" if VMXStatic.yaw_unlim >= 0 else "1") +
  233.                               ("1" if TitanStatic.odo_reset_x >= 0 else "0") +
  234.                               ("1" if TitanStatic.odo_reset_y >= 0 else "0") + '1', 2)
  235.  
  236.             # на всякий случай
  237.             TitanStatic.odo_reset_coords = False
  238.  
  239.             yaw_unsigned: bytearray = Funcad.int_to_byte_4(abs(int(-VMXStatic.yaw_unlim * 10)))
  240.             tx_data[14] = yaw_unsigned[2]
  241.             tx_data[15] = yaw_unsigned[3]
  242.  
  243.             reset_x_odo: bytearray = Funcad.int_to_byte_4(abs(int(TitanStatic.odo_reset_x * 10)))
  244.             tx_data[16] = reset_x_odo[2]
  245.             tx_data[17] = reset_x_odo[3]
  246.  
  247.             reset_y_odo: bytearray = Funcad.int_to_byte_4(abs(int(TitanStatic.odo_reset_y * 10)))
  248.             tx_data[18] = reset_y_odo[2]
  249.             tx_data[19] = reset_y_odo[3]
  250.  
  251.             tx_data[20] = 222
  252.  
  253.             # if TitanStatic.speed_motor_0 > 0:
  254.             #     Logger.write_com_log("received wrong data " + " ".join(map(str, tx_data)))
  255.  
  256.         return tx_data
Advertisement
RAW Paste Data Copied
Advertisement