Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import json
- from dataclasses import asdict
- from datetime import datetime, timedelta, timezone
- from zoneinfo import ZoneInfo
- import websocket
- from chip.clusters import Objects as clusters
- WEBSOCKET_ADDRESS = "ws://192.168.1.59:5580/ws"
- TIMEZONE_IANA = "Europe/Berlin"
- DEVICES_TO_SYNC = [(6, 0)] # (node id, endpoint id)
- MATTER_EPOCH = datetime(2000, 1, 1, tzinfo=timezone.utc)
- MICROSECONDS_PER_SECOND = 1_000_000
- class MatterTimeSync:
- def __init__(self, ws_address, tz_name):
- self.ws_address = ws_address
- self.tz_name = tz_name
- self.ws = websocket.WebSocket()
- self.message_counter = 1
- def sync_device(self, node_id, endpoint):
- tz_info = self.get_timezone_info(self.tz_name)
- timezone_obj = {"offset": tz_info["offset_seconds"], "validAt": 0, "name": self.tz_name}
- self.send_command(
- node_id, endpoint,
- clusters.TimeSynchronization.Commands.SetTimeZone(timeZone=[timezone_obj])
- )
- dst_obj = {
- "offset": tz_info["dst_adjustment_seconds"],
- "validStarting": self.to_matter_microseconds(tz_info["dst_start"]),
- "validUntil": self.to_matter_microseconds(tz_info["dst_end"]),
- }
- self.send_command(
- node_id, endpoint,
- clusters.TimeSynchronization.Commands.SetDSTOffset(DSTOffset=[dst_obj])
- )
- now_utc = datetime.now(timezone.utc)
- self.send_command(
- node_id, endpoint,
- clusters.TimeSynchronization.Commands.SetUTCTime(
- UTCTime=self.to_matter_microseconds(now_utc), granularity=4
- )
- )
- def connect(self):
- self.ws.connect(self.ws_address)
- print(self.ws.recv())
- def close(self):
- self.ws.close()
- @staticmethod
- def to_matter_microseconds(dt):
- if not dt:
- return None
- delta = dt - MATTER_EPOCH
- return int(delta.total_seconds() * MICROSECONDS_PER_SECOND)
- @staticmethod
- def get_timezone_info(tz_name):
- tz = ZoneInfo(tz_name)
- now_utc = datetime.now(timezone.utc)
- year = now_utc.year
- monthly_offsets = [
- datetime(year, month, 1, tzinfo=timezone.utc).astimezone(tz).utcoffset()
- for month in range(1, 13)
- ]
- standard_offset = min(monthly_offsets)
- standard_seconds = int(standard_offset.total_seconds())
- max_offset = max(monthly_offsets)
- dst_adjustment_seconds = int(max_offset.total_seconds()) - standard_seconds
- dst_start, dst_end = MatterTimeSync._find_dst_transitions(year, tz, timezone.utc)
- return {
- "offset_seconds": standard_seconds,
- "dst_adjustment_seconds": dst_adjustment_seconds,
- "dst_start": dst_start,
- "dst_end": dst_end,
- }
- @staticmethod
- def _find_dst_transitions(year, tz, utc):
- start = datetime(year, 1, 1, tzinfo=utc)
- end = datetime(year + 1, 1, 1, tzinfo=utc)
- dst_start = dst_end = None
- current_time = start
- previous_offset = current_time.astimezone(tz).utcoffset()
- while current_time < end:
- current_offset = current_time.astimezone(tz).utcoffset()
- if current_offset != previous_offset:
- if int(current_offset.total_seconds()) > int(previous_offset.total_seconds()):
- dst_start = current_time
- else:
- dst_end = current_time
- previous_offset = current_offset
- current_time += timedelta(hours=1)
- return dst_start, dst_end
- @staticmethod
- def _dataclass_to_dict(obj):
- return asdict(obj, dict_factory=lambda items: {str(k): v for k, v in items})
- def send_command(self, node_id, endpoint, command):
- command_name = str(command).rsplit(".", 1)[-1].split("(", 1)[0]
- message = {
- "message_id": str(self.message_counter),
- "command": "device_command",
- "args": {
- "endpoint_id": endpoint,
- "node_id": node_id,
- "payload": self._dataclass_to_dict(command),
- "cluster_id": command.cluster_id,
- "command_name": command_name,
- },
- }
- json_message = json.dumps(message)
- print(json_message)
- self.ws.send(json_message)
- print(self.ws.recv())
- self.message_counter += 1
- if __name__ == "__main__":
- sync = MatterTimeSync(WEBSOCKET_ADDRESS, TIMEZONE_IANA)
- sync.connect()
- for node_id, endpoint_id in DEVICES_TO_SYNC:
- sync.sync_device(node_id, endpoint_id)
- sync.close()
Advertisement
Add Comment
Please, Sign In to add comment