Guest User

Untitled

a guest
Jan 1st, 2026
94
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 4.70 KB | None | 0 0
  1. import json
  2. from dataclasses import asdict
  3. from datetime import datetime, timedelta, timezone
  4. from zoneinfo import ZoneInfo
  5.  
  6. import websocket
  7. from chip.clusters import Objects as clusters
  8.  
  9.  
  10. WEBSOCKET_ADDRESS = "ws://192.168.1.59:5580/ws"
  11. TIMEZONE_IANA = "Europe/Berlin"
  12. DEVICES_TO_SYNC = [(6, 0)] # (node id, endpoint id)
  13.  
  14. MATTER_EPOCH = datetime(2000, 1, 1, tzinfo=timezone.utc)
  15. MICROSECONDS_PER_SECOND = 1_000_000
  16.  
  17.  
  18. class MatterTimeSync:
  19.     def __init__(self, ws_address, tz_name):
  20.         self.ws_address = ws_address
  21.         self.tz_name = tz_name
  22.         self.ws = websocket.WebSocket()
  23.         self.message_counter = 1
  24.  
  25.     def sync_device(self, node_id, endpoint):
  26.         tz_info = self.get_timezone_info(self.tz_name)
  27.  
  28.         timezone_obj = {"offset": tz_info["offset_seconds"], "validAt": 0, "name": self.tz_name}
  29.         self.send_command(
  30.             node_id, endpoint,
  31.             clusters.TimeSynchronization.Commands.SetTimeZone(timeZone=[timezone_obj])
  32.         )
  33.  
  34.         dst_obj = {
  35.             "offset": tz_info["dst_adjustment_seconds"],
  36.             "validStarting": self.to_matter_microseconds(tz_info["dst_start"]),
  37.             "validUntil": self.to_matter_microseconds(tz_info["dst_end"]),
  38.         }
  39.         self.send_command(
  40.             node_id, endpoint,
  41.             clusters.TimeSynchronization.Commands.SetDSTOffset(DSTOffset=[dst_obj])
  42.         )
  43.  
  44.         now_utc = datetime.now(timezone.utc)
  45.         self.send_command(
  46.             node_id, endpoint,
  47.             clusters.TimeSynchronization.Commands.SetUTCTime(
  48.                 UTCTime=self.to_matter_microseconds(now_utc), granularity=4
  49.             )
  50.         )
  51.  
  52.     def connect(self):
  53.         self.ws.connect(self.ws_address)
  54.         print(self.ws.recv())
  55.  
  56.     def close(self):
  57.         self.ws.close()
  58.  
  59.     @staticmethod
  60.     def to_matter_microseconds(dt):
  61.         if not dt:
  62.             return None
  63.         delta = dt - MATTER_EPOCH
  64.         return int(delta.total_seconds() * MICROSECONDS_PER_SECOND)
  65.  
  66.     @staticmethod
  67.     def get_timezone_info(tz_name):
  68.         tz = ZoneInfo(tz_name)
  69.         now_utc = datetime.now(timezone.utc)
  70.         year = now_utc.year
  71.  
  72.         monthly_offsets = [
  73.             datetime(year, month, 1, tzinfo=timezone.utc).astimezone(tz).utcoffset()
  74.             for month in range(1, 13)
  75.         ]
  76.  
  77.         standard_offset = min(monthly_offsets)
  78.         standard_seconds = int(standard_offset.total_seconds())
  79.         max_offset = max(monthly_offsets)
  80.         dst_adjustment_seconds = int(max_offset.total_seconds()) - standard_seconds
  81.  
  82.         dst_start, dst_end = MatterTimeSync._find_dst_transitions(year, tz, timezone.utc)
  83.  
  84.         return {
  85.             "offset_seconds": standard_seconds,
  86.             "dst_adjustment_seconds": dst_adjustment_seconds,
  87.             "dst_start": dst_start,
  88.             "dst_end": dst_end,
  89.         }
  90.  
  91.     @staticmethod
  92.     def _find_dst_transitions(year, tz, utc):
  93.         start = datetime(year, 1, 1, tzinfo=utc)
  94.         end = datetime(year + 1, 1, 1, tzinfo=utc)
  95.         dst_start = dst_end = None
  96.         current_time = start
  97.         previous_offset = current_time.astimezone(tz).utcoffset()
  98.  
  99.         while current_time < end:
  100.             current_offset = current_time.astimezone(tz).utcoffset()
  101.             if current_offset != previous_offset:
  102.                 if int(current_offset.total_seconds()) > int(previous_offset.total_seconds()):
  103.                     dst_start = current_time
  104.                 else:
  105.                     dst_end = current_time
  106.                 previous_offset = current_offset
  107.             current_time += timedelta(hours=1)
  108.  
  109.         return dst_start, dst_end
  110.  
  111.     @staticmethod
  112.     def _dataclass_to_dict(obj):
  113.         return asdict(obj, dict_factory=lambda items: {str(k): v for k, v in items})
  114.  
  115.     def send_command(self, node_id, endpoint, command):
  116.         command_name = str(command).rsplit(".", 1)[-1].split("(", 1)[0]
  117.         message = {
  118.             "message_id": str(self.message_counter),
  119.             "command": "device_command",
  120.             "args": {
  121.                 "endpoint_id": endpoint,
  122.                 "node_id": node_id,
  123.                 "payload": self._dataclass_to_dict(command),
  124.                 "cluster_id": command.cluster_id,
  125.                 "command_name": command_name,
  126.             },
  127.         }
  128.         json_message = json.dumps(message)
  129.         print(json_message)
  130.         self.ws.send(json_message)
  131.         print(self.ws.recv())
  132.         self.message_counter += 1
  133.  
  134. if __name__ == "__main__":
  135.     sync = MatterTimeSync(WEBSOCKET_ADDRESS, TIMEZONE_IANA)
  136.     sync.connect()
  137.     for node_id, endpoint_id in DEVICES_TO_SYNC:
  138.         sync.sync_device(node_id, endpoint_id)
  139.     sync.close()
  140.  
Advertisement
Add Comment
Please, Sign In to add comment