Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import bellows.ezsp
- import logging
- from bellows.zigbee.application import ControllerApplication
- from zigpy.exceptions import ZigbeeException, DeliveryError
- from zigpy.util import retryable
- DEVICE_NWK = 0x80af
- DATABASE = '/home/ha/.homeassistant/zigbee.db'
- HUSBZB_PORT = '/dev/ttyUSB0'
- _LOGGER = logging.getLogger(__name__)
- async def start():
- global APP
- radio = bellows.ezsp.EZSP()
- await radio.connect(HUSBZB_PORT, 57600)
- APP = ControllerApplication(radio, DATABASE)
- await APP.startup(auto_form=True)
- return APP
- @retryable((DeliveryError, asyncio.TimeoutError), tries=5)
- async def read_attr(cluster, attrs):
- return await cluster.read_attributes(attrs, allow_cache=False)
- async def scan_cluster(cluster):
- supported = {}
- unsupported = {}
- for attr_id, attr in cluster.attributes.items():
- attr_name = attr[0]
- _LOGGER.info("Trying '{}' attribute attr id: {}".format(attr_name,
- attr_id))
- res, fail = {}, {}
- try:
- res, fail = await read_attr(cluster, [attr_name])
- except ZigbeeException as ex:
- _LOGGER.error(
- "Failed to read '{}' attribute. Error: {}".format(attr_name,
- ex)
- )
- if attr_name in res:
- val = res.get(attr_name)
- _LOGGER.info("'{}' = {}".format(attr_name, val))
- supported["0x{:04x}-{}".format(attr_id, attr_name)] = val
- elif attr_name in fail:
- res = fail[attr_name]
- unsupported["0x{:04x}-{}".format(attr_id, attr_name)] = res
- _LOGGER.debug("attr {} read fail: {}".format(attr_name, fail))
- else:
- _LOGGER.error("Something unexpected reading '{}' attribute. {}/{}".
- format(attr_name, res, fail))
- await asyncio.sleep(0.2)
- print("All supported attributes")
- for attr in sorted(iter(supported.keys())):
- print("'{}' = {}".format(attr, supported[attr]))
- print('===============================')
- print("All un-supported attributes")
- for attr in sorted(iter(unsupported.keys())):
- print("'{}' = {}".format(attr, unsupported[attr]))
- async def task(device_nwk):
- app = await start()
- dev = app.get_device(nwk=device_nwk)
- print("Scanning device 0x{:04x}".format(device_nwk))
- for epid, ep in dev.endpoints.items():
- if epid == 0:
- continue
- print('===============================')
- print("endpoint #{}".format(epid))
- await scan_endpoint(ep)
- async def scan_endpoint(ep):
- for cluster in ep.in_clusters.values():
- print('===============================')
- print("{} cluster".format(cluster.ep_attribute))
- await scan_cluster(cluster)
- def main():
- _LOGGER.setLevel(logging.DEBUG)
- loop = asyncio.get_event_loop()
- loop.run_until_complete(task(DEVICE_NWK))
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement