Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- integrated_lora_mppt.py
- This script sends a query frame over LoRa (via USB-TO-LoRa COM3 @115200)
- to the SX1262-LoRa-DTU, which forwards to the MPPT at 9600 bps, reads back the
- response, and parses the data. Logging and retries are included for robustness.
- """
- import serial
- import serial.tools.list_ports
- import struct
- import time
- import logging
- from datetime import datetime
- # === Configuration ===
- LORA_PORT = 'COM3' # USB-TO-LoRa-xF serial port
- LORA_BAUD = 115200 # Baud rate for USB-TO-LoRa :contentReference[oaicite:7]{index=7}
- MPPT_BAUD = 9600 # Baud rate expected by MPPT via DTU RS-485 :contentReference[oaicite:8]{index=8}
- QUERY_INTERVAL = 5 # Seconds between automatic queries
- SERIAL_TIMEOUT = 1 # Read timeout in seconds
- MAX_RETRIES = 3 # Number of retries on failure
- # MPPT query frame from your existing code
- # Adapt the frame bytes if needed; here using TelePython example:
- QUERY_FRAME = bytes([0x01, 0xB3, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00]) # Last byte for checksum appended below
- def calculate_checksum(frame_bytes: bytes) -> int:
- """
- Calculate 8-bit checksum as sum(frame) & 0xFF, matching the MPPT protocol.
- """
- return sum(frame_bytes) & 0xFF
- def build_query_frame() -> bytes:
- """
- Build and return the full query frame with checksum.
- """
- # Base frame as in TelePython: [0x01, 0xB3, 0x01, 0x00,0x00,0x00,0x00]
- # If your protocol differs, adjust accordingly.
- base = bytearray([0x01, 0xB3, 0x01, 0x00, 0x00, 0x00, 0x00])
- chksum = calculate_checksum(base)
- base.append(chksum)
- return bytes(base)
- def parse_response(response: bytes) -> dict:
- """
- Parse the raw response bytes from MPPT and return a dict of values.
- Adjust offsets/scales per your MPPT’s documented protocol.
- """
- # Example based on TelePython (response length expected ~37 bytes)
- if len(response) < 18:
- raise ValueError(f"Response too short: {len(response)} bytes")
- # Offsets per TelePython:
- # response[6:8] -> PV voltage (uint16 >> /10)
- # response[8:10] -> Battery voltage (uint16 >> /100)
- # response[10:12]-> Charging current (uint16 >> /100)
- # response[12:14]-> Internal temp (uint16 >> /10)
- # response[16:18]-> External temp (uint16 >> /10)
- data = {}
- try:
- pv_voltage = (response[6] << 8 | response[7]) / 10.0
- battery_voltage = (response[8] << 8 | response[9]) / 100.0
- charging_current = (response[10] << 8 | response[11]) / 100.0
- internal_temp = (response[12] << 8 | response[13]) / 10.0
- external_temp = (response[16] << 8 | response[17]) / 10.0
- operating_status = response[3]
- charging_status = response[4]
- control_status = response[5]
- # Timestamp
- now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- data.update({
- 'timestamp': now,
- 'pv_voltage_V': pv_voltage,
- 'battery_voltage_V': battery_voltage,
- 'charging_current_A': charging_current,
- 'internal_temp_C': internal_temp,
- 'external_temp_C': external_temp,
- 'operating_status_bits': format(operating_status, '08b'),
- 'charging_status_bits': format(charging_status, '08b'),
- 'control_status_bits': format(control_status, '08b'),
- 'raw_length': len(response),
- })
- except IndexError as ex:
- raise ValueError(f"Parsing failed, unexpected response length: {len(response)}") from ex
- return data
- def find_serial_port_by_keyword(keyword: str) -> str:
- """
- Optionally identify ports by keyword in description or name.
- E.g., 'Silicon Labs' or partial VID/PID match. Returns first match or raises.
- """
- ports = serial.tools.list_ports.comports()
- for p in ports:
- if keyword.lower() in (p.description or '').lower() or keyword.lower() in (p.device or '').lower():
- return p.device
- raise IOError(f"No serial port matching keyword '{keyword}' found among: {[p.device for p in ports]}")
- def open_lora_port(port_name: str) -> serial.Serial:
- """
- Open and return a Serial instance for the USB-TO-LoRa module.
- """
- ser = serial.Serial(port=port_name,
- baudrate=LORA_BAUD,
- bytesize=serial.EIGHTBITS,
- parity=serial.PARITY_NONE,
- stopbits=serial.STOPBITS_ONE,
- timeout=SERIAL_TIMEOUT)
- logging.info(f"Opened LoRa serial port {port_name} @ {LORA_BAUD}bps")
- return ser
- def send_query_and_receive(ser: serial.Serial) -> bytes:
- """
- Send the query frame over LoRa serial port and read the MPPT response.
- Retries if timeout or incorrect length.
- """
- frame = build_query_frame()
- for attempt in range(1, MAX_RETRIES+1):
- logging.debug(f"Attempt {attempt}: sending frame {frame.hex()}")
- ser.reset_input_buffer()
- ser.write(frame)
- # Wait a bit for round-trip (LoRa TX + MPPT processing + LoRa RX)
- time.sleep(0.5 + QUERY_INTERVAL*0) # base delay; adjust if needed
- # Read whatever bytes have arrived
- resp = ser.read(256) # read up to 256 bytes; timeouts used
- if resp:
- logging.debug(f"Received raw response (len={len(resp)}): {resp.hex()}")
- return resp
- else:
- logging.warning(f"No response on attempt {attempt}, retrying...")
- raise TimeoutError(f"No response after {MAX_RETRIES} attempts")
- def main():
- # Configure logging to console
- logging.basicConfig(level=logging.INFO,
- format='%(asctime)s [%(levelname)s] %(message)s',
- datefmt='%H:%M:%S')
- # Optionally auto-detect LoRa port by keyword; fallback to LORA_PORT
- try:
- port = find_serial_port_by_keyword('Silicon') # example keyword
- except Exception:
- port = LORA_PORT
- logging.info(f"Using LoRa port: {port}")
- try:
- ser_lora = open_lora_port(port)
- except Exception as e:
- logging.error(f"Failed to open LoRa port: {e}")
- return
- try:
- while True:
- try:
- raw = send_query_and_receive(ser_lora)
- data = parse_response(raw)
- # Display parsed data
- logging.info("MPPT Data:")
- for k, v in data.items():
- logging.info(f" {k}: {v}")
- except Exception as ex:
- logging.error(f"Error during query/parse: {ex}")
- # Wait before next query
- time.sleep(QUERY_INTERVAL)
- except KeyboardInterrupt:
- logging.info("Interrupted by user; exiting.")
- finally:
- ser_lora.close()
- logging.info("Serial port closed.")
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement