Advertisement
Guest User

Untitled

a guest
Jun 16th, 2025
18
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 6.84 KB | Software | 0 0
  1. """
  2. integrated_lora_mppt.py
  3.  
  4. This script sends a query frame over LoRa (via USB-TO-LoRa COM3 @115200)
  5. to the SX1262-LoRa-DTU, which forwards to the MPPT at 9600 bps, reads back the
  6. response, and parses the data. Logging and retries are included for robustness.
  7. """
  8.  
  9. import serial
  10. import serial.tools.list_ports
  11. import struct
  12. import time
  13. import logging
  14. from datetime import datetime
  15.  
  16. # === Configuration ===
  17. LORA_PORT      = 'COM3'    # USB-TO-LoRa-xF serial port
  18. LORA_BAUD      = 115200    # Baud rate for USB-TO-LoRa :contentReference[oaicite:7]{index=7}
  19. MPPT_BAUD      = 9600      # Baud rate expected by MPPT via DTU RS-485 :contentReference[oaicite:8]{index=8}
  20. QUERY_INTERVAL = 5         # Seconds between automatic queries
  21. SERIAL_TIMEOUT = 1         # Read timeout in seconds
  22. MAX_RETRIES    = 3         # Number of retries on failure
  23.  
  24. # MPPT query frame from your existing code
  25. # Adapt the frame bytes if needed; here using TelePython example:
  26. QUERY_FRAME = bytes([0x01, 0xB3, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00])  # Last byte for checksum appended below
  27.  
  28. def calculate_checksum(frame_bytes: bytes) -> int:
  29.     """
  30.    Calculate 8-bit checksum as sum(frame) & 0xFF, matching the MPPT protocol.
  31.    """
  32.     return sum(frame_bytes) & 0xFF
  33.  
  34. def build_query_frame() -> bytes:
  35.     """
  36.    Build and return the full query frame with checksum.
  37.    """
  38.     # Base frame as in TelePython: [0x01, 0xB3, 0x01, 0x00,0x00,0x00,0x00]
  39.     # If your protocol differs, adjust accordingly.
  40.     base = bytearray([0x01, 0xB3, 0x01, 0x00, 0x00, 0x00, 0x00])
  41.     chksum = calculate_checksum(base)
  42.     base.append(chksum)
  43.     return bytes(base)
  44.  
  45. def parse_response(response: bytes) -> dict:
  46.     """
  47.    Parse the raw response bytes from MPPT and return a dict of values.
  48.    Adjust offsets/scales per your MPPT’s documented protocol.
  49.    """
  50.     # Example based on TelePython (response length expected ~37 bytes)
  51.     if len(response) < 18:
  52.         raise ValueError(f"Response too short: {len(response)} bytes")
  53.  
  54.     # Offsets per TelePython:
  55.     # response[6:8]  -> PV voltage (uint16 >> /10)
  56.     # response[8:10] -> Battery voltage (uint16 >> /100)
  57.     # response[10:12]-> Charging current (uint16 >> /100)
  58.     # response[12:14]-> Internal temp (uint16 >> /10)
  59.     # response[16:18]-> External temp (uint16 >> /10)
  60.     data = {}
  61.     try:
  62.         pv_voltage = (response[6] << 8 | response[7]) / 10.0
  63.         battery_voltage = (response[8] << 8 | response[9]) / 100.0
  64.         charging_current = (response[10] << 8 | response[11]) / 100.0
  65.         internal_temp = (response[12] << 8 | response[13]) / 10.0
  66.         external_temp = (response[16] << 8 | response[17]) / 10.0
  67.  
  68.         operating_status = response[3]
  69.         charging_status = response[4]
  70.         control_status = response[5]
  71.  
  72.         # Timestamp
  73.         now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  74.  
  75.         data.update({
  76.             'timestamp': now,
  77.             'pv_voltage_V': pv_voltage,
  78.             'battery_voltage_V': battery_voltage,
  79.             'charging_current_A': charging_current,
  80.             'internal_temp_C': internal_temp,
  81.             'external_temp_C': external_temp,
  82.             'operating_status_bits': format(operating_status, '08b'),
  83.             'charging_status_bits': format(charging_status, '08b'),
  84.             'control_status_bits': format(control_status, '08b'),
  85.             'raw_length': len(response),
  86.         })
  87.     except IndexError as ex:
  88.         raise ValueError(f"Parsing failed, unexpected response length: {len(response)}") from ex
  89.  
  90.     return data
  91.  
  92. def find_serial_port_by_keyword(keyword: str) -> str:
  93.     """
  94.    Optionally identify ports by keyword in description or name.
  95.    E.g., 'Silicon Labs' or partial VID/PID match. Returns first match or raises.
  96.    """
  97.     ports = serial.tools.list_ports.comports()
  98.     for p in ports:
  99.         if keyword.lower() in (p.description or '').lower() or keyword.lower() in (p.device or '').lower():
  100.             return p.device
  101.     raise IOError(f"No serial port matching keyword '{keyword}' found among: {[p.device for p in ports]}")
  102.  
  103. def open_lora_port(port_name: str) -> serial.Serial:
  104.     """
  105.    Open and return a Serial instance for the USB-TO-LoRa module.
  106.    """
  107.     ser = serial.Serial(port=port_name,
  108.                         baudrate=LORA_BAUD,
  109.                         bytesize=serial.EIGHTBITS,
  110.                         parity=serial.PARITY_NONE,
  111.                         stopbits=serial.STOPBITS_ONE,
  112.                         timeout=SERIAL_TIMEOUT)
  113.     logging.info(f"Opened LoRa serial port {port_name} @ {LORA_BAUD}bps")
  114.     return ser
  115.  
  116. def send_query_and_receive(ser: serial.Serial) -> bytes:
  117.     """
  118.    Send the query frame over LoRa serial port and read the MPPT response.
  119.    Retries if timeout or incorrect length.
  120.    """
  121.     frame = build_query_frame()
  122.     for attempt in range(1, MAX_RETRIES+1):
  123.         logging.debug(f"Attempt {attempt}: sending frame {frame.hex()}")
  124.         ser.reset_input_buffer()
  125.         ser.write(frame)
  126.         # Wait a bit for round-trip (LoRa TX + MPPT processing + LoRa RX)
  127.         time.sleep(0.5 + QUERY_INTERVAL*0)  # base delay; adjust if needed
  128.         # Read whatever bytes have arrived
  129.         resp = ser.read(256)  # read up to 256 bytes; timeouts used
  130.         if resp:
  131.             logging.debug(f"Received raw response (len={len(resp)}): {resp.hex()}")
  132.             return resp
  133.         else:
  134.             logging.warning(f"No response on attempt {attempt}, retrying...")
  135.     raise TimeoutError(f"No response after {MAX_RETRIES} attempts")
  136.  
  137. def main():
  138.     # Configure logging to console
  139.     logging.basicConfig(level=logging.INFO,
  140.                         format='%(asctime)s [%(levelname)s] %(message)s',
  141.                         datefmt='%H:%M:%S')
  142.  
  143.     # Optionally auto-detect LoRa port by keyword; fallback to LORA_PORT
  144.     try:
  145.         port = find_serial_port_by_keyword('Silicon')  # example keyword
  146.     except Exception:
  147.         port = LORA_PORT
  148.     logging.info(f"Using LoRa port: {port}")
  149.  
  150.     try:
  151.         ser_lora = open_lora_port(port)
  152.     except Exception as e:
  153.         logging.error(f"Failed to open LoRa port: {e}")
  154.         return
  155.  
  156.     try:
  157.         while True:
  158.             try:
  159.                 raw = send_query_and_receive(ser_lora)
  160.                 data = parse_response(raw)
  161.                 # Display parsed data
  162.                 logging.info("MPPT Data:")
  163.                 for k, v in data.items():
  164.                     logging.info(f"  {k}: {v}")
  165.             except Exception as ex:
  166.                 logging.error(f"Error during query/parse: {ex}")
  167.             # Wait before next query
  168.             time.sleep(QUERY_INTERVAL)
  169.     except KeyboardInterrupt:
  170.         logging.info("Interrupted by user; exiting.")
  171.     finally:
  172.         ser_lora.close()
  173.         logging.info("Serial port closed.")
  174.  
  175. if __name__ == '__main__':
  176.     main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement