Guest User

Untitled

a guest
Nov 26th, 2025
52
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 15.76 KB | None | 0 0
  1. """Low level UDP client implementation for pymarstek."""
  2.  
  3. from __future__ import annotations
  4.  
  5. import asyncio
  6. from contextlib import suppress
  7. import ipaddress
  8. import json
  9. import logging
  10. import socket
  11. from typing import Any
  12.  
  13. try:
  14. import psutil # type: ignore[import-not-found]
  15. except Exception: # noqa: BLE001 - optional dependency
  16. psutil = None # type: ignore[assignment]
  17.  
  18. from .command_builder import discover, get_es_mode, get_pv_status
  19. from .const import DEFAULT_UDP_PORT, DISCOVERY_TIMEOUT
  20. from .data_parser import (
  21. merge_device_status,
  22. parse_es_mode_response,
  23. parse_pv_status_response,
  24. )
  25.  
  26. _LOGGER = logging.getLogger(__name__)
  27.  
  28.  
  29. class MarstekUDPClient:
  30. """UDP client for communicating with Marstek devices."""
  31.  
  32. def __init__(self, port: int = DEFAULT_UDP_PORT) -> None:
  33. self._port = port
  34. self._socket: socket.socket | None = None
  35. self._pending_requests: dict[int, asyncio.Future] = {}
  36. self._response_cache: dict[int, dict[str, Any]] = {}
  37. self._listen_task: asyncio.Task | None = None
  38. self._loop: asyncio.AbstractEventLoop | None = None
  39.  
  40. self._discovery_cache: list[dict[str, Any]] | None = None
  41. self._cache_timestamp: float = 0
  42. self._cache_duration: float = 30.0
  43.  
  44. self._local_send_ip: str = "0.0.0.0"
  45. self._polling_paused: dict[str, bool] = {}
  46. self._polling_lock: asyncio.Lock = asyncio.Lock()
  47.  
  48. async def async_setup(self) -> None:
  49. """Prepare the UDP socket."""
  50. if self._socket is not None:
  51. return
  52.  
  53. self._loop = asyncio.get_running_loop()
  54.  
  55. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  56. sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
  57. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  58. sock.setblocking(False)
  59. sock.bind(("0.0.0.0", self._port))
  60. self._socket = sock
  61. _LOGGER.debug("UDP client bound to %s:%s", sock.getsockname()[0], sock.getsockname()[1])
  62.  
  63. async def async_cleanup(self) -> None:
  64. """Close the UDP socket."""
  65. if self._listen_task and not self._listen_task.done():
  66. self._listen_task.cancel()
  67. with suppress(asyncio.CancelledError):
  68. await self._listen_task
  69. if self._socket:
  70. self._socket.close()
  71. self._socket = None
  72.  
  73. def _get_broadcast_addresses(self) -> list[str]:
  74. addresses = {"255.255.255.255"}
  75. if psutil is not None:
  76. try:
  77. for addrs in psutil.net_if_addrs().values():
  78. for addr in addrs:
  79. if addr.family == socket.AF_INET and not addr.address.startswith("127."):
  80. if getattr(addr, "broadcast", None):
  81. addresses.add(addr.broadcast)
  82. elif getattr(addr, "netmask", None):
  83. try:
  84. network = ipaddress.IPv4Network(
  85. f"{addr.address}/{addr.netmask}", strict=False
  86. )
  87. addresses.add(str(network.broadcast_address))
  88. except (ValueError, OSError):
  89. continue
  90. except OSError as err:
  91. _LOGGER.warning("Failed to obtain network interfaces: %s", err)
  92. try:
  93. local_ips = {
  94. addr.address
  95. for addrs in psutil.net_if_addrs().values()
  96. for addr in addrs
  97. if addr.family == socket.AF_INET
  98. }
  99. addresses -= local_ips
  100. except OSError:
  101. pass
  102. return list(addresses)
  103.  
  104. def _is_cache_valid(self) -> bool:
  105. if self._discovery_cache is None:
  106. return False
  107. loop = self._loop or asyncio.get_running_loop()
  108. return (loop.time() - self._cache_timestamp) < self._cache_duration
  109.  
  110. def clear_discovery_cache(self) -> None:
  111. self._discovery_cache = None
  112. self._cache_timestamp = 0
  113.  
  114. async def _send_udp_message(self, message: str, target_ip: str, target_port: int) -> None:
  115. if not self._socket:
  116. await self.async_setup()
  117. assert self._socket is not None
  118. data = message.encode("utf-8")
  119. self._socket.sendto(data, (target_ip, target_port))
  120. _LOGGER.debug("Send: %s:%d | %s", target_ip, target_port, message)
  121.  
  122. async def send_request(
  123. self,
  124. message: str,
  125. target_ip: str,
  126. target_port: int,
  127. timeout: float = 5.0,
  128. *,
  129. quiet_on_timeout: bool = False,
  130. ) -> dict[str, Any]:
  131. if not self._socket:
  132. await self.async_setup()
  133. assert self._socket is not None
  134.  
  135. try:
  136. message_obj = json.loads(message)
  137. request_id = message_obj["id"]
  138. except (json.JSONDecodeError, KeyError) as exc:
  139. raise ValueError("Invalid message: missing id") from exc
  140.  
  141. future: asyncio.Future = asyncio.Future()
  142. self._pending_requests[request_id] = future
  143.  
  144. try:
  145. if not self._listen_task or self._listen_task.done():
  146. loop = self._loop or asyncio.get_running_loop()
  147. self._listen_task = loop.create_task(self._listen_for_responses())
  148.  
  149. await self._send_udp_message(message, target_ip, target_port)
  150. _LOGGER.warning("Send request to %s:%d: %s", target_ip, target_port, message)
  151. return await asyncio.wait_for(future, timeout=timeout)
  152. except TimeoutError as err:
  153. if not quiet_on_timeout:
  154. _LOGGER.warning("Request timeout: %s:%d", target_ip, target_port)
  155. raise TimeoutError(f"Request timeout to {target_ip}:{target_port}") from err
  156. finally:
  157. self._pending_requests.pop(request_id, None)
  158.  
  159. async def _listen_for_responses(self) -> None:
  160. assert self._socket is not None
  161. loop = self._loop or asyncio.get_running_loop()
  162. while True:
  163. try:
  164. data, addr = await loop.sock_recvfrom(self._socket, 4096)
  165. response_text = data.decode("utf-8")
  166. try:
  167. response = json.loads(response_text)
  168. except json.JSONDecodeError:
  169. response = {"raw": response_text}
  170. request_id = response.get("id") if isinstance(response, dict) else None
  171. _LOGGER.debug("Recv: %s:%d | %s", addr[0], addr[1], response)
  172. if request_id:
  173. self._response_cache[request_id] = {
  174. "response": response,
  175. "addr": addr,
  176. "timestamp": loop.time(),
  177. }
  178. future = self._pending_requests.pop(request_id, None)
  179. if future and not future.done():
  180. future.set_result(response)
  181. except asyncio.CancelledError:
  182. break
  183. except OSError as err:
  184. _LOGGER.error("Error receiving UDP response: %s", err)
  185. await asyncio.sleep(1)
  186.  
  187. async def send_broadcast_request(self, message: str, timeout: float = DISCOVERY_TIMEOUT) -> list[dict[str, Any]]:
  188. print(f"[DEBUG] ========== Starting device broadcast discovery ==========")
  189. print(f"[DEBUG] Broadcast message: {message}")
  190. print(f"[DEBUG] Timeout: {timeout} seconds")
  191. if not self._socket:
  192. await self.async_setup()
  193. assert self._socket is not None
  194.  
  195. try:
  196. message_obj = json.loads(message)
  197. request_id = message_obj["id"]
  198. print(f"[DEBUG] Request ID: {request_id}")
  199. except (json.JSONDecodeError, KeyError) as exc:
  200. _LOGGER.error("Invalid message for broadcast: %s", exc)
  201. return []
  202.  
  203. responses: list[dict[str, Any]] = []
  204. loop = self._loop or asyncio.get_running_loop()
  205. start_time = loop.time()
  206.  
  207. future: asyncio.Future = asyncio.Future()
  208. self._pending_requests[request_id] = future
  209.  
  210. try:
  211. if not self._listen_task or self._listen_task.done():
  212. self._listen_task = loop.create_task(self._listen_for_responses())
  213.  
  214. broadcast_addresses = self._get_broadcast_addresses()
  215. print(f"[DEBUG] Broadcast addresses: {broadcast_addresses}")
  216. print(f"[DEBUG] Port: {self._port}")
  217. for address in broadcast_addresses:
  218. print(f"[DEBUG] Sending to broadcast address: {address}:{self._port}")
  219. await self._send_udp_message(message, address, self._port)
  220.  
  221. while (loop.time() - start_time) < timeout:
  222. cached = self._response_cache.pop(request_id, None)
  223. if cached:
  224. print(f"[DEBUG] Received device response: {cached['response']}")
  225. responses.append(cached["response"])
  226. await asyncio.sleep(0.1)
  227. finally:
  228. self._pending_requests.pop(request_id, None)
  229. print(f"[DEBUG] Broadcast discovery completed, received {len(responses)} response(s)")
  230. print(f"[DEBUG] ========== Broadcast discovery ended ==========")
  231. return responses
  232.  
  233. async def discover_devices(self, use_cache: bool = True) -> list[dict[str, Any]]:
  234. print(f"[DEBUG] ========== Starting device discovery ==========")
  235. print(f"[DEBUG] Use cache: {use_cache}")
  236. if use_cache and self._is_cache_valid():
  237. assert self._discovery_cache is not None
  238. print(f"[DEBUG] Using cached data, returning {len(self._discovery_cache)} device(s)")
  239. return self._discovery_cache.copy()
  240.  
  241. devices: list[dict[str, Any]] = []
  242. seen_devices: set[str] = set()
  243.  
  244. try:
  245. print(f"[DEBUG] Executing broadcast request...")
  246. responses = await self.send_broadcast_request(discover())
  247. print(f"[DEBUG] Received {len(responses)} response(s)")
  248. except OSError as err:
  249. _LOGGER.error("Device discovery failed: %s", err)
  250. responses = []
  251.  
  252. loop = self._loop or asyncio.get_running_loop()
  253.  
  254. for response in responses:
  255. result = response.get("result") if isinstance(response, dict) else None
  256. if not isinstance(result, dict):
  257. continue
  258.  
  259. device_id = (
  260. result.get("ip")
  261. or result.get("ble_mac")
  262. or result.get("wifi_mac")
  263. or f"device_{int(loop.time())}_{hash(str(result)) % 10000}"
  264. )
  265. if device_id in seen_devices:
  266. continue
  267. seen_devices.add(device_id)
  268.  
  269. devices.append(
  270. {
  271. "id": result.get("id", 0),
  272. "device_type": result.get("device", "Unknown"),
  273. "version": result.get("ver", 0),
  274. "wifi_name": result.get("wifi_name", ""),
  275. "ip": result.get("ip", ""),
  276. "wifi_mac": result.get("wifi_mac", ""),
  277. "ble_mac": result.get("ble_mac", ""),
  278. "mac": result.get("wifi_mac") or result.get("ble_mac", ""),
  279. "model": result.get("device", "Unknown"),
  280. "firmware": str(result.get("ver", 0)),
  281. }
  282. )
  283.  
  284. self._discovery_cache = devices.copy()
  285. self._cache_timestamp = loop.time()
  286. print(f"[DEBUG] Device discovery completed, found {len(devices)} device(s)")
  287. for i, device in enumerate(devices):
  288. print(f"[DEBUG] Device {i+1}: {device.get('device_type', 'Unknown')} - {device.get('ip', 'Unknown IP')}")
  289. print(f"[DEBUG] ========== Device discovery ended ==========")
  290. return devices
  291.  
  292. async def pause_polling(self, device_ip: str) -> None:
  293. async with self._polling_lock:
  294. self._polling_paused[device_ip] = True
  295.  
  296. async def resume_polling(self, device_ip: str) -> None:
  297. async with self._polling_lock:
  298. self._polling_paused[device_ip] = False
  299.  
  300. def is_polling_paused(self, device_ip: str) -> bool:
  301. return self._polling_paused.get(device_ip, False)
  302.  
  303. async def send_request_with_polling_control(
  304. self,
  305. message: str,
  306. target_ip: str,
  307. target_port: int,
  308. timeout: float = 5.0,
  309. ) -> dict[str, Any]:
  310. await self.pause_polling(target_ip)
  311. try:
  312. return await self.send_request(
  313. message, target_ip, target_port, timeout, quiet_on_timeout=True
  314. )
  315. finally:
  316. await self.resume_polling(target_ip)
  317.  
  318. async def get_device_status(
  319. self,
  320. device_ip: str,
  321. port: int = DEFAULT_UDP_PORT,
  322. timeout: float = 2.5,
  323. *,
  324. include_pv: bool = True,
  325. delay_between_requests: float = 2.0,
  326. ) -> dict[str, Any]:
  327. """Get complete device status including battery and PV data.
  328.  
  329. Args:
  330. device_ip: IP address of the device
  331. port: UDP port (default: DEFAULT_UDP_PORT)
  332. timeout: Request timeout in seconds
  333. include_pv: Whether to include PV status data
  334. delay_between_requests: Delay between ES and PV requests in seconds
  335.  
  336. Returns:
  337. Dictionary with complete device status
  338. """
  339. es_mode_data: dict[str, Any] | None = None
  340. pv_status_data: dict[str, Any] | None = None
  341.  
  342. # Get ES mode status
  343. try:
  344. es_mode_command = get_es_mode(0)
  345. es_mode_response = await self.send_request(
  346. es_mode_command, device_ip, port, timeout=timeout
  347. )
  348. es_mode_data = parse_es_mode_response(es_mode_response)
  349. _LOGGER.debug(
  350. "ES.GetMode parsed for %s: SOC=%s%%, Power=%sW, Mode=%s, Status=%s",
  351. device_ip,
  352. es_mode_data.get("battery_soc"),
  353. es_mode_data.get("battery_power"),
  354. es_mode_data.get("device_mode"),
  355. es_mode_data.get("battery_status"),
  356. )
  357. except (TimeoutError, OSError, ValueError) as err:
  358. _LOGGER.debug(
  359. "ES.GetMode failed for %s: %s", device_ip, err
  360. )
  361.  
  362. # Get PV status if requested
  363. if include_pv:
  364. await asyncio.sleep(delay_between_requests)
  365. try:
  366. pv_status_command = get_pv_status(0)
  367. pv_status_response = await self.send_request(
  368. pv_status_command, device_ip, port, timeout=timeout
  369. )
  370. pv_status_data = parse_pv_status_response(pv_status_response)
  371. _LOGGER.debug(
  372. "PV.GetStatus parsed for %s: PV1=%sW, PV2=%sW, PV3=%sW, PV4=%sW",
  373. device_ip,
  374. pv_status_data.get("pv1_power"),
  375. pv_status_data.get("pv2_power"),
  376. pv_status_data.get("pv3_power"),
  377. pv_status_data.get("pv4_power"),
  378. )
  379. except (TimeoutError, OSError, ValueError) as err:
  380. _LOGGER.debug(
  381. "PV.GetStatus failed for %s: %s", device_ip, err
  382. )
  383.  
  384. # Merge data
  385. loop = self._loop or asyncio.get_running_loop()
  386. return merge_device_status(
  387. es_mode_data=es_mode_data,
  388. pv_status_data=pv_status_data,
  389. device_ip=device_ip,
  390. last_update=loop.time(),
  391. )
  392.  
Advertisement
Add Comment
Please, Sign In to add comment