alejandrotecnimaq

Script interceptor

May 7th, 2021
731
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. """Script para interceptar el protocolo serial de analizadores de gases"""
  2.  
  3. import datetime
  4. import os
  5. import sys
  6. # pylint: disable=import-error
  7. import serial
  8. # pylint: enable=import-error
  9. import interpretar_cadena
  10.  
  11. TIEMPO_DE_ESPERA = 0.1
  12.  
  13. CADENAS_PEDIR_DATOS = {
  14.     b'\x02\x60\x03': 'comando_ms',
  15.     b'\x03\x00\x02\x00\x00\x00\x00\x00\x05\x00': 'comando_amb',
  16.     b'\x49\x01\x20\x96': 'comando_cap',
  17.     b'\x41\x01\x20\x9E': 'comando_cap',
  18.     b'\x54\x01\x20\x8B': 'comando_cap',
  19.     b'\x05\x32\x30\x02\x32\x30\x33\x03\x30\x31\x04': 'comando_bb',
  20.     b'\x10\x01\x40\xAF': 'comando_hor',
  21. }
  22.  
  23. ARCHIVO_DATOS = datetime.datetime.now().strftime("%Y%m%d%H%M") + '.csv'
  24.  
  25.  
  26. def hex_string(buffer):
  27.     """Convierte un buffer a valores hexadecimales para imprimir más bonito"""
  28.     return ' '.join(["{0:02X}".format(x) for x in buffer])
  29.  
  30.  
  31. def analizar_cadena(string, protocol):
  32.     """Analiza los datos dependiendo del protocolo"""
  33.     def traer_valor(diccionario, key):
  34.         value = diccionario.get(key)
  35.         if value is not None:
  36.             return str(value)
  37.  
  38.         return '-'
  39.     datos = getattr(interpretar_cadena, protocol)(string)
  40.     if datos:
  41.         # Si no existe el archivo csv, creelo
  42.         if not os.path.isfile(ARCHIVO_DATOS):
  43.             os.mknod(ARCHIVO_DATOS)
  44.             with open(ARCHIVO_DATOS, 'w') as archivo:
  45.                 # archivo.write("Fecha,Hora,HC,CO,CO2,O2\n")
  46.                 archivo.write("Hora,HC,CO,CO2,O2\n")
  47.  
  48.         ahora = datetime.datetime.now()
  49.         data = [
  50.             # ahora.strftime("%Y-%m-%d"),
  51.             ahora.strftime("%H:%M:%S"),
  52.             traer_valor(datos, 'hc'),
  53.             traer_valor(datos, 'co'),
  54.             traer_valor(datos, 'co2'),
  55.             traer_valor(datos, 'o2')
  56.         ]
  57.         linea = ','.join(data) + "\n"
  58.         with open(ARCHIVO_DATOS, 'a') as archivo:
  59.             archivo.write(linea)
  60.  
  61.  
  62. def print_datetime():
  63.     """Imprime la fecha y la hora"""
  64.     print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), end=' -- ')
  65.  
  66.  
  67. def imprimir_desde_el_master(string):
  68.     """Imprime el mensaje recibido desde el master, returna el protocolo"""
  69.     print_datetime()
  70.     print("Recibido desde el master:", hex_string(string))
  71.     if string in CADENAS_PEDIR_DATOS:
  72.         protocol = CADENAS_PEDIR_DATOS[string]
  73.         print('Esta cadena corresponde a pedir datos del protocolo ' + protocol)
  74.         print("----------")
  75.         return protocol
  76.  
  77.     print(string)
  78.     print("----------")
  79.     return None
  80.  
  81.  
  82. def imprimir_respuesta(string, protocol):
  83.     """Imprime la respuesta recibida desde el slave, si hay dato de protocolo la interpreta"""
  84.     print_datetime()
  85.     print("Respuesta:", hex_string(string))
  86.     if protocol:
  87.         analizar_cadena(string, protocol)
  88.         print("----------")
  89.         return
  90.  
  91.     print(string)
  92.     print("----------")
  93.  
  94.  
  95. def determinar_seriales():
  96.     """
  97.    Si se pasan los puertos seriales como argumentos al script los asigna
  98.    en caso contrario toma los puertos USB que haya
  99.    """
  100.     port = ""
  101.     if len(sys.argv) > 2:
  102.         port = sys.argv[2]
  103.     else:
  104.         port = "/dev/ttyO2"
  105.     serial1 = serial.Serial(port, timeout=1)
  106.  
  107.     if len(sys.argv) > 1:
  108.         port = sys.argv[1]
  109.     else:
  110.         port = "/dev/ttyO5"
  111.  
  112.     serial0 = serial.Serial(port, timeout=1)
  113.  
  114.     return serial0, serial1
  115.  
  116.  
  117. def main():
  118.     """Función principal"""
  119.     cambio = False
  120.     ultimo_leido = "master"
  121.     master_str = b""
  122.     slave_str = b""
  123.     protocolo = ''
  124.  
  125.     master, slave = determinar_seriales()
  126.  
  127.     print("MASTER:", master.port)
  128.     print("SLAVE:", slave.port)
  129.     print()
  130.     while True:
  131.         while master.inWaiting():
  132.             leido = master.read(master.inWaiting())
  133.             master_str += leido
  134.             if ultimo_leido != "master":
  135.                 cambio = True
  136.                 ultimo_leido = "master"
  137.  
  138.         if cambio and ultimo_leido == "master":
  139.             imprimir_respuesta(slave_str, protocolo)
  140.             protocolo = ''
  141.             slave_str = b""
  142.             cambio = False
  143.  
  144.         if cambio and ultimo_leido == "slave":
  145.             protocolo = imprimir_desde_el_master(master_str)
  146.             master_str = b""
  147.             cambio = False
  148.  
  149.         while slave.inWaiting():
  150.             leido = slave.read(slave.inWaiting())
  151.             slave_str += leido
  152.             if ultimo_leido != "slave":
  153.                 cambio = True
  154.                 ultimo_leido = "slave"
  155.  
  156.         if cambio and ultimo_leido == "master":
  157.             imprimir_respuesta(slave_str, protocolo)
  158.             protocolo = ''
  159.             slave_str = b""
  160.             cambio = False
  161.  
  162.         if cambio and ultimo_leido == "slave":
  163.             protocolo = imprimir_desde_el_master(master_str)
  164.             master_str = b""
  165.             cambio = False
  166.  
  167.  
  168. if __name__ == '__main__':
  169.     main()
  170.  
RAW Paste Data