Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """Low level UDP client implementation for pymarstek."""
- from __future__ import annotations
- import asyncio
- from contextlib import suppress
- import ipaddress
- import json
- import logging
- import socket
- from typing import Any
- try:
- import psutil # type: ignore[import-not-found]
- except Exception: # noqa: BLE001 - optional dependency
- psutil = None # type: ignore[assignment]
- from .command_builder import discover, get_es_mode, get_pv_status
- from .const import DEFAULT_UDP_PORT, DISCOVERY_TIMEOUT
- from .data_parser import (
- merge_device_status,
- parse_es_mode_response,
- parse_pv_status_response,
- )
- _LOGGER = logging.getLogger(__name__)
- class MarstekUDPClient:
- """UDP client for communicating with Marstek devices."""
- def __init__(self, port: int = DEFAULT_UDP_PORT) -> None:
- self._port = port
- self._socket: socket.socket | None = None
- self._pending_requests: dict[int, asyncio.Future] = {}
- self._response_cache: dict[int, dict[str, Any]] = {}
- self._listen_task: asyncio.Task | None = None
- self._loop: asyncio.AbstractEventLoop | None = None
- self._discovery_cache: list[dict[str, Any]] | None = None
- self._cache_timestamp: float = 0
- self._cache_duration: float = 30.0
- self._local_send_ip: str = "0.0.0.0"
- self._polling_paused: dict[str, bool] = {}
- self._polling_lock: asyncio.Lock = asyncio.Lock()
- async def async_setup(self) -> None:
- """Prepare the UDP socket."""
- if self._socket is not None:
- return
- self._loop = asyncio.get_running_loop()
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.setblocking(False)
- sock.bind(("0.0.0.0", self._port))
- self._socket = sock
- _LOGGER.debug("UDP client bound to %s:%s", sock.getsockname()[0], sock.getsockname()[1])
- async def async_cleanup(self) -> None:
- """Close the UDP socket."""
- if self._listen_task and not self._listen_task.done():
- self._listen_task.cancel()
- with suppress(asyncio.CancelledError):
- await self._listen_task
- if self._socket:
- self._socket.close()
- self._socket = None
- def _get_broadcast_addresses(self) -> list[str]:
- addresses = {"255.255.255.255"}
- if psutil is not None:
- try:
- for addrs in psutil.net_if_addrs().values():
- for addr in addrs:
- if addr.family == socket.AF_INET and not addr.address.startswith("127."):
- if getattr(addr, "broadcast", None):
- addresses.add(addr.broadcast)
- elif getattr(addr, "netmask", None):
- try:
- network = ipaddress.IPv4Network(
- f"{addr.address}/{addr.netmask}", strict=False
- )
- addresses.add(str(network.broadcast_address))
- except (ValueError, OSError):
- continue
- except OSError as err:
- _LOGGER.warning("Failed to obtain network interfaces: %s", err)
- try:
- local_ips = {
- addr.address
- for addrs in psutil.net_if_addrs().values()
- for addr in addrs
- if addr.family == socket.AF_INET
- }
- addresses -= local_ips
- except OSError:
- pass
- return list(addresses)
- def _is_cache_valid(self) -> bool:
- if self._discovery_cache is None:
- return False
- loop = self._loop or asyncio.get_running_loop()
- return (loop.time() - self._cache_timestamp) < self._cache_duration
- def clear_discovery_cache(self) -> None:
- self._discovery_cache = None
- self._cache_timestamp = 0
- async def _send_udp_message(self, message: str, target_ip: str, target_port: int) -> None:
- if not self._socket:
- await self.async_setup()
- assert self._socket is not None
- data = message.encode("utf-8")
- self._socket.sendto(data, (target_ip, target_port))
- _LOGGER.debug("Send: %s:%d | %s", target_ip, target_port, message)
- async def send_request(
- self,
- message: str,
- target_ip: str,
- target_port: int,
- timeout: float = 5.0,
- *,
- quiet_on_timeout: bool = False,
- ) -> dict[str, Any]:
- if not self._socket:
- await self.async_setup()
- assert self._socket is not None
- try:
- message_obj = json.loads(message)
- request_id = message_obj["id"]
- except (json.JSONDecodeError, KeyError) as exc:
- raise ValueError("Invalid message: missing id") from exc
- future: asyncio.Future = asyncio.Future()
- self._pending_requests[request_id] = future
- try:
- if not self._listen_task or self._listen_task.done():
- loop = self._loop or asyncio.get_running_loop()
- self._listen_task = loop.create_task(self._listen_for_responses())
- await self._send_udp_message(message, target_ip, target_port)
- _LOGGER.warning("Send request to %s:%d: %s", target_ip, target_port, message)
- return await asyncio.wait_for(future, timeout=timeout)
- except TimeoutError as err:
- if not quiet_on_timeout:
- _LOGGER.warning("Request timeout: %s:%d", target_ip, target_port)
- raise TimeoutError(f"Request timeout to {target_ip}:{target_port}") from err
- finally:
- self._pending_requests.pop(request_id, None)
- async def _listen_for_responses(self) -> None:
- assert self._socket is not None
- loop = self._loop or asyncio.get_running_loop()
- while True:
- try:
- data, addr = await loop.sock_recvfrom(self._socket, 4096)
- response_text = data.decode("utf-8")
- try:
- response = json.loads(response_text)
- except json.JSONDecodeError:
- response = {"raw": response_text}
- request_id = response.get("id") if isinstance(response, dict) else None
- _LOGGER.debug("Recv: %s:%d | %s", addr[0], addr[1], response)
- if request_id:
- self._response_cache[request_id] = {
- "response": response,
- "addr": addr,
- "timestamp": loop.time(),
- }
- future = self._pending_requests.pop(request_id, None)
- if future and not future.done():
- future.set_result(response)
- except asyncio.CancelledError:
- break
- except OSError as err:
- _LOGGER.error("Error receiving UDP response: %s", err)
- await asyncio.sleep(1)
- async def send_broadcast_request(self, message: str, timeout: float = DISCOVERY_TIMEOUT) -> list[dict[str, Any]]:
- print(f"[DEBUG] ========== Starting device broadcast discovery ==========")
- print(f"[DEBUG] Broadcast message: {message}")
- print(f"[DEBUG] Timeout: {timeout} seconds")
- if not self._socket:
- await self.async_setup()
- assert self._socket is not None
- try:
- message_obj = json.loads(message)
- request_id = message_obj["id"]
- print(f"[DEBUG] Request ID: {request_id}")
- except (json.JSONDecodeError, KeyError) as exc:
- _LOGGER.error("Invalid message for broadcast: %s", exc)
- return []
- responses: list[dict[str, Any]] = []
- loop = self._loop or asyncio.get_running_loop()
- start_time = loop.time()
- future: asyncio.Future = asyncio.Future()
- self._pending_requests[request_id] = future
- try:
- if not self._listen_task or self._listen_task.done():
- self._listen_task = loop.create_task(self._listen_for_responses())
- broadcast_addresses = self._get_broadcast_addresses()
- print(f"[DEBUG] Broadcast addresses: {broadcast_addresses}")
- print(f"[DEBUG] Port: {self._port}")
- for address in broadcast_addresses:
- print(f"[DEBUG] Sending to broadcast address: {address}:{self._port}")
- await self._send_udp_message(message, address, self._port)
- while (loop.time() - start_time) < timeout:
- cached = self._response_cache.pop(request_id, None)
- if cached:
- print(f"[DEBUG] Received device response: {cached['response']}")
- responses.append(cached["response"])
- await asyncio.sleep(0.1)
- finally:
- self._pending_requests.pop(request_id, None)
- print(f"[DEBUG] Broadcast discovery completed, received {len(responses)} response(s)")
- print(f"[DEBUG] ========== Broadcast discovery ended ==========")
- return responses
- async def discover_devices(self, use_cache: bool = True) -> list[dict[str, Any]]:
- print(f"[DEBUG] ========== Starting device discovery ==========")
- print(f"[DEBUG] Use cache: {use_cache}")
- if use_cache and self._is_cache_valid():
- assert self._discovery_cache is not None
- print(f"[DEBUG] Using cached data, returning {len(self._discovery_cache)} device(s)")
- return self._discovery_cache.copy()
- devices: list[dict[str, Any]] = []
- seen_devices: set[str] = set()
- try:
- print(f"[DEBUG] Executing broadcast request...")
- responses = await self.send_broadcast_request(discover())
- print(f"[DEBUG] Received {len(responses)} response(s)")
- except OSError as err:
- _LOGGER.error("Device discovery failed: %s", err)
- responses = []
- loop = self._loop or asyncio.get_running_loop()
- for response in responses:
- result = response.get("result") if isinstance(response, dict) else None
- if not isinstance(result, dict):
- continue
- device_id = (
- result.get("ip")
- or result.get("ble_mac")
- or result.get("wifi_mac")
- or f"device_{int(loop.time())}_{hash(str(result)) % 10000}"
- )
- if device_id in seen_devices:
- continue
- seen_devices.add(device_id)
- devices.append(
- {
- "id": result.get("id", 0),
- "device_type": result.get("device", "Unknown"),
- "version": result.get("ver", 0),
- "wifi_name": result.get("wifi_name", ""),
- "ip": result.get("ip", ""),
- "wifi_mac": result.get("wifi_mac", ""),
- "ble_mac": result.get("ble_mac", ""),
- "mac": result.get("wifi_mac") or result.get("ble_mac", ""),
- "model": result.get("device", "Unknown"),
- "firmware": str(result.get("ver", 0)),
- }
- )
- self._discovery_cache = devices.copy()
- self._cache_timestamp = loop.time()
- print(f"[DEBUG] Device discovery completed, found {len(devices)} device(s)")
- for i, device in enumerate(devices):
- print(f"[DEBUG] Device {i+1}: {device.get('device_type', 'Unknown')} - {device.get('ip', 'Unknown IP')}")
- print(f"[DEBUG] ========== Device discovery ended ==========")
- return devices
- async def pause_polling(self, device_ip: str) -> None:
- async with self._polling_lock:
- self._polling_paused[device_ip] = True
- async def resume_polling(self, device_ip: str) -> None:
- async with self._polling_lock:
- self._polling_paused[device_ip] = False
- def is_polling_paused(self, device_ip: str) -> bool:
- return self._polling_paused.get(device_ip, False)
- async def send_request_with_polling_control(
- self,
- message: str,
- target_ip: str,
- target_port: int,
- timeout: float = 5.0,
- ) -> dict[str, Any]:
- await self.pause_polling(target_ip)
- try:
- return await self.send_request(
- message, target_ip, target_port, timeout, quiet_on_timeout=True
- )
- finally:
- await self.resume_polling(target_ip)
- async def get_device_status(
- self,
- device_ip: str,
- port: int = DEFAULT_UDP_PORT,
- timeout: float = 2.5,
- *,
- include_pv: bool = True,
- delay_between_requests: float = 2.0,
- ) -> dict[str, Any]:
- """Get complete device status including battery and PV data.
- Args:
- device_ip: IP address of the device
- port: UDP port (default: DEFAULT_UDP_PORT)
- timeout: Request timeout in seconds
- include_pv: Whether to include PV status data
- delay_between_requests: Delay between ES and PV requests in seconds
- Returns:
- Dictionary with complete device status
- """
- es_mode_data: dict[str, Any] | None = None
- pv_status_data: dict[str, Any] | None = None
- # Get ES mode status
- try:
- es_mode_command = get_es_mode(0)
- es_mode_response = await self.send_request(
- es_mode_command, device_ip, port, timeout=timeout
- )
- es_mode_data = parse_es_mode_response(es_mode_response)
- _LOGGER.debug(
- "ES.GetMode parsed for %s: SOC=%s%%, Power=%sW, Mode=%s, Status=%s",
- device_ip,
- es_mode_data.get("battery_soc"),
- es_mode_data.get("battery_power"),
- es_mode_data.get("device_mode"),
- es_mode_data.get("battery_status"),
- )
- except (TimeoutError, OSError, ValueError) as err:
- _LOGGER.debug(
- "ES.GetMode failed for %s: %s", device_ip, err
- )
- # Get PV status if requested
- if include_pv:
- await asyncio.sleep(delay_between_requests)
- try:
- pv_status_command = get_pv_status(0)
- pv_status_response = await self.send_request(
- pv_status_command, device_ip, port, timeout=timeout
- )
- pv_status_data = parse_pv_status_response(pv_status_response)
- _LOGGER.debug(
- "PV.GetStatus parsed for %s: PV1=%sW, PV2=%sW, PV3=%sW, PV4=%sW",
- device_ip,
- pv_status_data.get("pv1_power"),
- pv_status_data.get("pv2_power"),
- pv_status_data.get("pv3_power"),
- pv_status_data.get("pv4_power"),
- )
- except (TimeoutError, OSError, ValueError) as err:
- _LOGGER.debug(
- "PV.GetStatus failed for %s: %s", device_ip, err
- )
- # Merge data
- loop = self._loop or asyncio.get_running_loop()
- return merge_device_status(
- es_mode_data=es_mode_data,
- pv_status_data=pv_status_data,
- device_ip=device_ip,
- last_update=loop.time(),
- )
Advertisement
Add Comment
Please, Sign In to add comment