Advertisement
Guest User

Untitled

a guest
Nov 21st, 2016
452
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.75 KB | None | 0 0
  1. """
  2. Support for Broadlink RM2.
  3.  
  4. For more details about this platform, please refer to the documentation at
  5. https://home-assistant.io/components/switch.broadlink/
  6. """
  7. import logging
  8.  
  9. import voluptuous as vol
  10.  
  11. from homeassistant.components.switch import (SwitchDevice, PLATFORM_SCHEMA)
  12. from homeassistant.const import (
  13. CONF_FRIENDLY_NAME, CONF_SWITCHES, CONF_COMMAND_OFF,
  14. CONF_COMMAND_ON, CONF_COMMAND_STATE, CONF_HOST, CONF_MAC)
  15. import homeassistant.helpers.config_validation as cv
  16.  
  17. _LOGGER = logging.getLogger(__name__)
  18.  
  19. SWITCH_SCHEMA = vol.Schema({
  20. vol.Optional(CONF_COMMAND_OFF, default='true'): cv.string,
  21. vol.Optional(CONF_COMMAND_ON, default='true'): cv.string,
  22. vol.Optional(CONF_COMMAND_STATE): cv.string,
  23. vol.Optional(CONF_FRIENDLY_NAME): cv.string,
  24. })
  25.  
  26. PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
  27. vol.Required(CONF_SWITCHES): vol.Schema({cv.slug: SWITCH_SCHEMA}),
  28. vol.Required(CONF_HOST): cv.string,
  29. vol.Required(CONF_MAC): cv.string,
  30. })
  31.  
  32.  
  33. def setup_platform(hass, config, add_devices, discovery_info=None):
  34. """Setup Broadlink switches."""
  35. import binascii
  36. devices = config.get(CONF_SWITCHES, {})
  37. switches = []
  38. ip_addr = (config.get(CONF_HOST), 80)
  39. mac_addr = binascii.unhexlify(
  40. config.get(CONF_MAC).encode().replace(b':', b''))
  41.  
  42. try:
  43. broadlink = Broadlink.Device(ip_addr, mac_addr)
  44. auth = broadlink.auth()
  45. if auth:
  46. _LOGGER.info('Broadlink connection successfully established')
  47.  
  48. except ValueError as error:
  49. _LOGGER.error(error)
  50.  
  51. for object_id, device_config in devices.items():
  52.  
  53. switches.append(
  54. BroadlinkSwitch(
  55. hass,
  56. object_id,
  57. device_config.get(CONF_FRIENDLY_NAME, object_id),
  58. device_config.get(CONF_COMMAND_ON),
  59. device_config.get(CONF_COMMAND_OFF),
  60. broadlink
  61. )
  62. )
  63.  
  64. if not switches:
  65. _LOGGER.error("No switches added")
  66. return False
  67.  
  68. add_devices(switches)
  69.  
  70.  
  71. class BroadlinkSwitch(SwitchDevice):
  72. """Representation of an Broadlink switch."""
  73.  
  74. def __init__(self, hass, object_id, friendly_name, command_on,
  75. command_off, broadlink):
  76. """Initialize the switch."""
  77. self._hass = hass
  78. self._name = friendly_name
  79. self._state = False
  80. self._command_on = command_on
  81. self._command_off = command_off
  82. self._broadlink = broadlink
  83.  
  84. @staticmethod
  85. def _switch(command):
  86. """Execute the actual commands."""
  87.  
  88. @property
  89. def name(self):
  90. """Return the name of the switch."""
  91. return self._name
  92.  
  93. @property
  94. def is_on(self):
  95. """Return true if device is on."""
  96. return self._state
  97.  
  98. def update(self):
  99. """Update device state."""
  100.  
  101. def turn_on(self, **kwargs):
  102. """Turn the device on."""
  103. import base64
  104. self._state = True
  105. _LOGGER.info("Running command: %s", self._command_on)
  106. auth = self._broadlink.auth()
  107. if auth:
  108. self._broadlink.send_data(base64.b64decode(self._command_on))
  109.  
  110. def turn_off(self, **kwargs):
  111. """Turn the device off."""
  112. import base64
  113. self._state = False
  114. _LOGGER.info("Running command: %s", self._command_off)
  115. auth = self._broadlink.auth()
  116. if auth:
  117. self._broadlink.send_data(base64.b64decode(self._command_off))
  118.  
  119.  
  120. class Broadlink():
  121. """Broadlink connector class."""
  122.  
  123. class Device:
  124. """Broadlink Device."""
  125.  
  126. def __init__(self, host, mac):
  127. """Initialize the object."""
  128. import socket
  129. import random
  130. import threading
  131. self.host = host
  132. self.mac = mac
  133. self.count = random.randrange(0xffff)
  134.  
  135. self.key = b'\x09\x76\x28\x34\x3f\xe9\x9e'\
  136. b'\x23\x76\x5c\x15\x13\xac\xcf\x8b\x02'
  137. self.ivr = b'\x56\x2e\x17\x99\x6d\x09\x3d\x28\xdd'\
  138. b'\xb3\xba\x69\x5a\x2e\x6f\x58'
  139.  
  140. self.ip_arr = bytearray([0, 0, 0, 0])
  141. self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  142. self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  143. self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
  144. self.sock.bind(('', 0))
  145. self.lock = threading.Lock()
  146.  
  147. def auth(self):
  148. """Obtain the authentication key."""
  149. from Crypto.Cipher import AES
  150. payload = bytearray(0x50)
  151. payload[0x04] = 0x31
  152. payload[0x05] = 0x31
  153. payload[0x06] = 0x31
  154. payload[0x07] = 0x31
  155. payload[0x08] = 0x31
  156. payload[0x09] = 0x31
  157. payload[0x0a] = 0x31
  158. payload[0x0b] = 0x31
  159. payload[0x0c] = 0x31
  160. payload[0x0d] = 0x31
  161. payload[0x0e] = 0x31
  162. payload[0x0f] = 0x31
  163. payload[0x10] = 0x31
  164. payload[0x11] = 0x31
  165. payload[0x12] = 0x31
  166. payload[0x1e] = 0x01
  167. payload[0x2d] = 0x01
  168. payload[0x30] = ord('T')
  169. payload[0x31] = ord('e')
  170. payload[0x32] = ord('s')
  171. payload[0x33] = ord('t')
  172. payload[0x34] = ord(' ')
  173. payload[0x35] = ord(' ')
  174. payload[0x36] = ord('1')
  175.  
  176. response = self.send_packet(0x65, payload)
  177.  
  178. enc_payload = response[0x38:]
  179.  
  180. aes = AES.new(bytes(self.key), AES.MODE_CBC, bytes(self.ivr))
  181. payload = aes.decrypt(bytes(enc_payload))
  182.  
  183. if payload:
  184. self.ip_arr = payload[0x00:0x04]
  185. self.key = payload[0x04:0x14]
  186. return True
  187. else:
  188. _LOGGER.error('Connection to broadlink device has failed.')
  189. return False
  190.  
  191. def send_packet(self, command, payload, timeout=5.0):
  192. """Send packet to Broadlink device."""
  193. import socket
  194. from Crypto.Cipher import AES
  195. try:
  196. packet = bytearray(0x38)
  197. packet[0x00] = 0x5a
  198. packet[0x01] = 0xa5
  199. packet[0x02] = 0xaa
  200. packet[0x03] = 0x55
  201. packet[0x04] = 0x5a
  202. packet[0x05] = 0xa5
  203. packet[0x06] = 0xaa
  204. packet[0x07] = 0x55
  205. packet[0x24] = 0x2a
  206. packet[0x25] = 0x27
  207. packet[0x26] = command
  208. packet[0x28] = self.count & 0xff
  209. packet[0x29] = self.count >> 8
  210. packet[0x2a] = self.mac[0]
  211. packet[0x2b] = self.mac[1]
  212. packet[0x2c] = self.mac[2]
  213. packet[0x2d] = self.mac[3]
  214. packet[0x2e] = self.mac[4]
  215. packet[0x2f] = self.mac[5]
  216. packet[0x30] = self.ip_arr[0]
  217. packet[0x31] = self.ip_arr[1]
  218. packet[0x32] = self.ip_arr[2]
  219. packet[0x33] = self.ip_arr[3]
  220. except (IndexError, TypeError, NameError):
  221. _LOGGER.error('Invalid IP or MAC address.')
  222. return bytearray(0x30)
  223.  
  224. checksum = 0xbeaf
  225. for i, _ in enumerate(payload):
  226. checksum += payload[i]
  227. checksum = checksum & 0xffff
  228.  
  229. aes = AES.new(bytes(self.key), AES.MODE_CBC, bytes(self.ivr))
  230. payload = aes.encrypt(bytes(payload))
  231.  
  232. packet[0x34] = checksum & 0xff
  233. packet[0x35] = checksum >> 8
  234.  
  235. for i, _ in enumerate(payload):
  236. packet.append(payload[i])
  237.  
  238. checksum = 0xbeaf
  239. for i, _ in enumerate(packet):
  240. checksum += packet[i]
  241. checksum = checksum & 0xffff
  242. packet[0x20] = checksum & 0xff
  243. packet[0x21] = checksum >> 8
  244.  
  245. with self.lock:
  246. self.sock.sendto(packet, self.host)
  247. try:
  248. self.sock.settimeout(timeout)
  249. response = self.sock.recvfrom(1024)
  250. except socket.timeout:
  251. _LOGGER.error("Socket timeout...")
  252. return bytearray(0x30)
  253.  
  254. return response[0]
  255.  
  256. def send_data(self, data):
  257. """Send an IR or RF packet."""
  258. packet = bytearray([0x02, 0x00, 0x00, 0x00])
  259. packet += data
  260. self.send_packet(0x6a, packet)
  261.  
  262. def enter_learning(self):
  263. """Enter learning mode."""
  264. packet = bytearray(16)
  265. packet[0] = 3
  266. self.send_packet(0x6a, packet)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement