Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """Script para interceptar el protocolo serial de analizadores de gases"""
- import datetime
- import os
- import sys
- # pylint: disable=import-error
- import serial
- # pylint: enable=import-error
- import interpretar_cadena
- TIEMPO_DE_ESPERA = 0.1
- CADENAS_PEDIR_DATOS = {
- b'\x02\x60\x03': 'comando_ms',
- b'\x03\x00\x02\x00\x00\x00\x00\x00\x05\x00': 'comando_amb',
- b'\x49\x01\x20\x96': 'comando_cap',
- b'\x41\x01\x20\x9E': 'comando_cap',
- b'\x54\x01\x20\x8B': 'comando_cap',
- b'\x05\x32\x30\x02\x32\x30\x33\x03\x30\x31\x04': 'comando_bb',
- b'\x10\x01\x40\xAF': 'comando_hor',
- }
- ARCHIVO_DATOS = datetime.datetime.now().strftime("%Y%m%d%H%M") + '.csv'
- def hex_string(buffer):
- """Convierte un buffer a valores hexadecimales para imprimir más bonito"""
- return ' '.join(["{0:02X}".format(x) for x in buffer])
- def analizar_cadena(string, protocol):
- """Analiza los datos dependiendo del protocolo"""
- def traer_valor(diccionario, key):
- value = diccionario.get(key)
- if value is not None:
- return str(value)
- return '-'
- datos = getattr(interpretar_cadena, protocol)(string)
- if datos:
- # Si no existe el archivo csv, creelo
- if not os.path.isfile(ARCHIVO_DATOS):
- os.mknod(ARCHIVO_DATOS)
- with open(ARCHIVO_DATOS, 'w') as archivo:
- # archivo.write("Fecha,Hora,HC,CO,CO2,O2\n")
- archivo.write("Hora,HC,CO,CO2,O2\n")
- ahora = datetime.datetime.now()
- data = [
- # ahora.strftime("%Y-%m-%d"),
- ahora.strftime("%H:%M:%S"),
- traer_valor(datos, 'hc'),
- traer_valor(datos, 'co'),
- traer_valor(datos, 'co2'),
- traer_valor(datos, 'o2')
- ]
- linea = ','.join(data) + "\n"
- with open(ARCHIVO_DATOS, 'a') as archivo:
- archivo.write(linea)
- def print_datetime():
- """Imprime la fecha y la hora"""
- print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), end=' -- ')
- def imprimir_desde_el_master(string):
- """Imprime el mensaje recibido desde el master, returna el protocolo"""
- print_datetime()
- print("Recibido desde el master:", hex_string(string))
- if string in CADENAS_PEDIR_DATOS:
- protocol = CADENAS_PEDIR_DATOS[string]
- print('Esta cadena corresponde a pedir datos del protocolo ' + protocol)
- print("----------")
- return protocol
- print(string)
- print("----------")
- return None
- def imprimir_respuesta(string, protocol):
- """Imprime la respuesta recibida desde el slave, si hay dato de protocolo la interpreta"""
- print_datetime()
- print("Respuesta:", hex_string(string))
- if protocol:
- analizar_cadena(string, protocol)
- print("----------")
- return
- print(string)
- print("----------")
- def determinar_seriales():
- """
- Si se pasan los puertos seriales como argumentos al script los asigna
- en caso contrario toma los puertos USB que haya
- """
- port = ""
- if len(sys.argv) > 2:
- port = sys.argv[2]
- else:
- port = "/dev/ttyO2"
- serial1 = serial.Serial(port, timeout=1)
- if len(sys.argv) > 1:
- port = sys.argv[1]
- else:
- port = "/dev/ttyO5"
- serial0 = serial.Serial(port, timeout=1)
- return serial0, serial1
- def main():
- """Función principal"""
- cambio = False
- ultimo_leido = "master"
- master_str = b""
- slave_str = b""
- protocolo = ''
- master, slave = determinar_seriales()
- print("MASTER:", master.port)
- print("SLAVE:", slave.port)
- print()
- while True:
- while master.inWaiting():
- leido = master.read(master.inWaiting())
- master_str += leido
- if ultimo_leido != "master":
- cambio = True
- ultimo_leido = "master"
- if cambio and ultimo_leido == "master":
- imprimir_respuesta(slave_str, protocolo)
- protocolo = ''
- slave_str = b""
- cambio = False
- if cambio and ultimo_leido == "slave":
- protocolo = imprimir_desde_el_master(master_str)
- master_str = b""
- cambio = False
- while slave.inWaiting():
- leido = slave.read(slave.inWaiting())
- slave_str += leido
- if ultimo_leido != "slave":
- cambio = True
- ultimo_leido = "slave"
- if cambio and ultimo_leido == "master":
- imprimir_respuesta(slave_str, protocolo)
- protocolo = ''
- slave_str = b""
- cambio = False
- if cambio and ultimo_leido == "slave":
- protocolo = imprimir_desde_el_master(master_str)
- master_str = b""
- cambio = False
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement