Advertisement
Guest User

Untitled

a guest
Jul 23rd, 2019
123
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.97 KB | None | 0 0
  1. import asyncio
  2. import bellows.ezsp
  3. import logging
  4.  
  5. from bellows.zigbee.application import ControllerApplication
  6. from zigpy.exceptions import ZigbeeException, DeliveryError
  7. from zigpy.util import retryable
  8.  
  9.  
  10. DEVICE_NWK = 0x80af
  11. DATABASE = '/home/ha/.homeassistant/zigbee.db'
  12. HUSBZB_PORT = '/dev/ttyUSB0'
  13.  
  14. _LOGGER = logging.getLogger(__name__)
  15.  
  16.  
  17. async def start():
  18. global APP
  19.  
  20. radio = bellows.ezsp.EZSP()
  21. await radio.connect(HUSBZB_PORT, 57600)
  22.  
  23. APP = ControllerApplication(radio, DATABASE)
  24. await APP.startup(auto_form=True)
  25. return APP
  26.  
  27.  
  28. @retryable((DeliveryError, asyncio.TimeoutError), tries=5)
  29. async def read_attr(cluster, attrs):
  30. return await cluster.read_attributes(attrs, allow_cache=False)
  31.  
  32.  
  33. async def scan_cluster(cluster):
  34. supported = {}
  35. unsupported = {}
  36. for attr_id, attr in cluster.attributes.items():
  37. attr_name = attr[0]
  38. _LOGGER.info("Trying '{}' attribute attr id: {}".format(attr_name,
  39. attr_id))
  40. res, fail = {}, {}
  41. try:
  42. res, fail = await read_attr(cluster, [attr_name])
  43. except ZigbeeException as ex:
  44. _LOGGER.error(
  45. "Failed to read '{}' attribute. Error: {}".format(attr_name,
  46. ex)
  47. )
  48.  
  49. if attr_name in res:
  50. val = res.get(attr_name)
  51. _LOGGER.info("'{}' = {}".format(attr_name, val))
  52. supported["0x{:04x}-{}".format(attr_id, attr_name)] = val
  53. elif attr_name in fail:
  54. res = fail[attr_name]
  55. unsupported["0x{:04x}-{}".format(attr_id, attr_name)] = res
  56. _LOGGER.debug("attr {} read fail: {}".format(attr_name, fail))
  57. else:
  58. _LOGGER.error("Something unexpected reading '{}' attribute. {}/{}".
  59. format(attr_name, res, fail))
  60. await asyncio.sleep(0.2)
  61.  
  62. print("All supported attributes")
  63. for attr in sorted(iter(supported.keys())):
  64. print("'{}' = {}".format(attr, supported[attr]))
  65. print('===============================')
  66. print("All un-supported attributes")
  67. for attr in sorted(iter(unsupported.keys())):
  68. print("'{}' = {}".format(attr, unsupported[attr]))
  69.  
  70.  
  71. async def task(device_nwk):
  72. app = await start()
  73. dev = app.get_device(nwk=device_nwk)
  74. print("Scanning device 0x{:04x}".format(device_nwk))
  75. for epid, ep in dev.endpoints.items():
  76. if epid == 0:
  77. continue
  78. print('===============================')
  79. print("endpoint #{}".format(epid))
  80. await scan_endpoint(ep)
  81.  
  82.  
  83. async def scan_endpoint(ep):
  84. for cluster in ep.in_clusters.values():
  85. print('===============================')
  86. print("{} cluster".format(cluster.ep_attribute))
  87. await scan_cluster(cluster)
  88.  
  89.  
  90. def main():
  91. _LOGGER.setLevel(logging.DEBUG)
  92. loop = asyncio.get_event_loop()
  93. loop.run_until_complete(task(DEVICE_NWK))
  94.  
  95.  
  96. if __name__ == '__main__':
  97. main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement