Advertisement
alejandrotecnimaq

Script interceptor

May 7th, 2021
901
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 4.85 KB | None | 0 0
  1. """Script para interceptar el protocolo serial de analizadores de gases"""
  2.  
  3. import datetime
  4. import os
  5. import sys
  6. # pylint: disable=import-error
  7. import serial
  8. # pylint: enable=import-error
  9. import interpretar_cadena
  10.  
  11. TIEMPO_DE_ESPERA = 0.1
  12.  
  13. CADENAS_PEDIR_DATOS = {
  14.     b'\x02\x60\x03': 'comando_ms',
  15.     b'\x03\x00\x02\x00\x00\x00\x00\x00\x05\x00': 'comando_amb',
  16.     b'\x49\x01\x20\x96': 'comando_cap',
  17.     b'\x41\x01\x20\x9E': 'comando_cap',
  18.     b'\x54\x01\x20\x8B': 'comando_cap',
  19.     b'\x05\x32\x30\x02\x32\x30\x33\x03\x30\x31\x04': 'comando_bb',
  20.     b'\x10\x01\x40\xAF': 'comando_hor',
  21. }
  22.  
  23. ARCHIVO_DATOS = datetime.datetime.now().strftime("%Y%m%d%H%M") + '.csv'
  24.  
  25.  
  26. def hex_string(buffer):
  27.     """Convierte un buffer a valores hexadecimales para imprimir más bonito"""
  28.     return ' '.join(["{0:02X}".format(x) for x in buffer])
  29.  
  30.  
  31. def analizar_cadena(string, protocol):
  32.     """Analiza los datos dependiendo del protocolo"""
  33.     def traer_valor(diccionario, key):
  34.         value = diccionario.get(key)
  35.         if value is not None:
  36.             return str(value)
  37.  
  38.         return '-'
  39.     datos = getattr(interpretar_cadena, protocol)(string)
  40.     if datos:
  41.         # Si no existe el archivo csv, creelo
  42.         if not os.path.isfile(ARCHIVO_DATOS):
  43.             os.mknod(ARCHIVO_DATOS)
  44.             with open(ARCHIVO_DATOS, 'w') as archivo:
  45.                 # archivo.write("Fecha,Hora,HC,CO,CO2,O2\n")
  46.                 archivo.write("Hora,HC,CO,CO2,O2\n")
  47.  
  48.         ahora = datetime.datetime.now()
  49.         data = [
  50.             # ahora.strftime("%Y-%m-%d"),
  51.             ahora.strftime("%H:%M:%S"),
  52.             traer_valor(datos, 'hc'),
  53.             traer_valor(datos, 'co'),
  54.             traer_valor(datos, 'co2'),
  55.             traer_valor(datos, 'o2')
  56.         ]
  57.         linea = ','.join(data) + "\n"
  58.         with open(ARCHIVO_DATOS, 'a') as archivo:
  59.             archivo.write(linea)
  60.  
  61.  
  62. def print_datetime():
  63.     """Imprime la fecha y la hora"""
  64.     print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), end=' -- ')
  65.  
  66.  
  67. def imprimir_desde_el_master(string):
  68.     """Imprime el mensaje recibido desde el master, returna el protocolo"""
  69.     print_datetime()
  70.     print("Recibido desde el master:", hex_string(string))
  71.     if string in CADENAS_PEDIR_DATOS:
  72.         protocol = CADENAS_PEDIR_DATOS[string]
  73.         print('Esta cadena corresponde a pedir datos del protocolo ' + protocol)
  74.         print("----------")
  75.         return protocol
  76.  
  77.     print(string)
  78.     print("----------")
  79.     return None
  80.  
  81.  
  82. def imprimir_respuesta(string, protocol):
  83.     """Imprime la respuesta recibida desde el slave, si hay dato de protocolo la interpreta"""
  84.     print_datetime()
  85.     print("Respuesta:", hex_string(string))
  86.     if protocol:
  87.         analizar_cadena(string, protocol)
  88.         print("----------")
  89.         return
  90.  
  91.     print(string)
  92.     print("----------")
  93.  
  94.  
  95. def determinar_seriales():
  96.     """
  97.    Si se pasan los puertos seriales como argumentos al script los asigna
  98.    en caso contrario toma los puertos USB que haya
  99.    """
  100.     port = ""
  101.     if len(sys.argv) > 2:
  102.         port = sys.argv[2]
  103.     else:
  104.         port = "/dev/ttyO2"
  105.     serial1 = serial.Serial(port, timeout=1)
  106.  
  107.     if len(sys.argv) > 1:
  108.         port = sys.argv[1]
  109.     else:
  110.         port = "/dev/ttyO5"
  111.  
  112.     serial0 = serial.Serial(port, timeout=1)
  113.  
  114.     return serial0, serial1
  115.  
  116.  
  117. def main():
  118.     """Función principal"""
  119.     cambio = False
  120.     ultimo_leido = "master"
  121.     master_str = b""
  122.     slave_str = b""
  123.     protocolo = ''
  124.  
  125.     master, slave = determinar_seriales()
  126.  
  127.     print("MASTER:", master.port)
  128.     print("SLAVE:", slave.port)
  129.     print()
  130.     while True:
  131.         while master.inWaiting():
  132.             leido = master.read(master.inWaiting())
  133.             master_str += leido
  134.             if ultimo_leido != "master":
  135.                 cambio = True
  136.                 ultimo_leido = "master"
  137.  
  138.         if cambio and ultimo_leido == "master":
  139.             imprimir_respuesta(slave_str, protocolo)
  140.             protocolo = ''
  141.             slave_str = b""
  142.             cambio = False
  143.  
  144.         if cambio and ultimo_leido == "slave":
  145.             protocolo = imprimir_desde_el_master(master_str)
  146.             master_str = b""
  147.             cambio = False
  148.  
  149.         while slave.inWaiting():
  150.             leido = slave.read(slave.inWaiting())
  151.             slave_str += leido
  152.             if ultimo_leido != "slave":
  153.                 cambio = True
  154.                 ultimo_leido = "slave"
  155.  
  156.         if cambio and ultimo_leido == "master":
  157.             imprimir_respuesta(slave_str, protocolo)
  158.             protocolo = ''
  159.             slave_str = b""
  160.             cambio = False
  161.  
  162.         if cambio and ultimo_leido == "slave":
  163.             protocolo = imprimir_desde_el_master(master_str)
  164.             master_str = b""
  165.             cambio = False
  166.  
  167.  
  168. if __name__ == '__main__':
  169.     main()
  170.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement