Advertisement
Guest User

Untitled

a guest
Nov 21st, 2016
642
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.35 KB | None | 0 0
  1. #!/usr/bin/python
  2.  
  3. import os
  4. import binascii
  5. import base64
  6.  
  7. ip_addr='10.0.0.10'
  8. mac_addr='B4:43:0D:C3:F8:42'
  9.  
  10. class Broadlink():
  11. """Broadlink connector class."""
  12.  
  13. class Device:
  14. """Broadlink Device."""
  15.  
  16. def __init__(self, host, mac):
  17. """Initialize the object."""
  18. import socket
  19. import random
  20. import threading
  21. self.host = host
  22. self.mac = mac
  23. self.count = random.randrange(0xffff)
  24.  
  25. self.key = b'\x09\x76\x28\x34\x3f\xe9\x9e'\
  26. b'\x23\x76\x5c\x15\x13\xac\xcf\x8b\x02'
  27. self.ivr = b'\x56\x2e\x17\x99\x6d\x09\x3d\x28\xdd'\
  28. b'\xb3\xba\x69\x5a\x2e\x6f\x58'
  29.  
  30. self.ip_arr = bytearray([0, 0, 0, 0])
  31. self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  32. self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  33. self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
  34. self.sock.bind(('', 0))
  35. self.lock = threading.Lock()
  36.  
  37. def auth(self):
  38. """Obtain the authentication key."""
  39. from Crypto.Cipher import AES
  40. payload = bytearray(0x50)
  41. payload[0x04] = 0x31
  42. payload[0x05] = 0x31
  43. payload[0x06] = 0x31
  44. payload[0x07] = 0x31
  45. payload[0x08] = 0x31
  46. payload[0x09] = 0x31
  47. payload[0x0a] = 0x31
  48. payload[0x0b] = 0x31
  49. payload[0x0c] = 0x31
  50. payload[0x0d] = 0x31
  51. payload[0x0e] = 0x31
  52. payload[0x0f] = 0x31
  53. payload[0x10] = 0x31
  54. payload[0x11] = 0x31
  55. payload[0x12] = 0x31
  56. payload[0x1e] = 0x01
  57. payload[0x2d] = 0x01
  58. payload[0x30] = ord('T')
  59. payload[0x31] = ord('e')
  60. payload[0x32] = ord('s')
  61. payload[0x33] = ord('t')
  62. payload[0x34] = ord(' ')
  63. payload[0x35] = ord(' ')
  64. payload[0x36] = ord('1')
  65.  
  66. response = self.send_packet(0x65, payload)
  67.  
  68. enc_payload = response[0x38:]
  69.  
  70. aes = AES.new(bytes(self.key), AES.MODE_CBC, bytes(self.ivr))
  71. payload = aes.decrypt(bytes(enc_payload))
  72.  
  73. if payload:
  74. self.ip_arr = payload[0x00:0x04]
  75. self.key = payload[0x04:0x14]
  76. return True
  77. else:
  78. _LOGGER.error('Connection to broadlink device has failed.')
  79. return False
  80.  
  81. def send_packet(self, command, payload, timeout=5.0):
  82. """Send packet to Broadlink device."""
  83. import socket
  84. from Crypto.Cipher import AES
  85. try:
  86. packet = bytearray(0x38)
  87. packet[0x00] = 0x5a
  88. packet[0x01] = 0xa5
  89. packet[0x02] = 0xaa
  90. packet[0x03] = 0x55
  91. packet[0x04] = 0x5a
  92. packet[0x05] = 0xa5
  93. packet[0x06] = 0xaa
  94. packet[0x07] = 0x55
  95. packet[0x24] = 0x2a
  96. packet[0x25] = 0x27
  97. packet[0x26] = command
  98. packet[0x28] = self.count & 0xff
  99. packet[0x29] = self.count >> 8
  100. packet[0x2a] = self.mac[0]
  101. packet[0x2b] = self.mac[1]
  102. packet[0x2c] = self.mac[2]
  103. packet[0x2d] = self.mac[3]
  104. packet[0x2e] = self.mac[4]
  105. packet[0x2f] = self.mac[5]
  106. packet[0x30] = self.ip_arr[0]
  107. packet[0x31] = self.ip_arr[1]
  108. packet[0x32] = self.ip_arr[2]
  109. packet[0x33] = self.ip_arr[3]
  110. except (IndexError, TypeError, NameError):
  111. _LOGGER.error('Invalid IP or MAC address.')
  112. return bytearray(0x30)
  113.  
  114. checksum = 0xbeaf
  115. for i, _ in enumerate(payload):
  116. checksum += payload[i]
  117. checksum = checksum & 0xffff
  118.  
  119. aes = AES.new(bytes(self.key), AES.MODE_CBC, bytes(self.ivr))
  120. payload = aes.encrypt(bytes(payload))
  121.  
  122. packet[0x34] = checksum & 0xff
  123. packet[0x35] = checksum >> 8
  124.  
  125. for i, _ in enumerate(payload):
  126. packet.append(payload[i])
  127.  
  128. checksum = 0xbeaf
  129. for i, _ in enumerate(packet):
  130. checksum += packet[i]
  131. checksum = checksum & 0xffff
  132. packet[0x20] = checksum & 0xff
  133. packet[0x21] = checksum >> 8
  134.  
  135. with self.lock:
  136. self.sock.sendto(packet, self.host)
  137. try:
  138. self.sock.settimeout(timeout)
  139. response = self.sock.recvfrom(1024)
  140. except socket.timeout:
  141. _LOGGER.error("Socket timeout...")
  142. return bytearray(0x30)
  143.  
  144. return response[0]
  145.  
  146. def send_data(self, data):
  147. """Send an IR or RF packet."""
  148. packet = bytearray([0x02, 0x00, 0x00, 0x00])
  149. packet += data
  150. self.send_packet(0x6a, packet)
  151.  
  152. def enter_learning(self):
  153. """Enter learning mode."""
  154. packet = bytearray(16)
  155. packet[0] = 3
  156. self.send_packet(0x6a, packet)
  157.  
  158. def check_data(self):
  159. from Crypto.Cipher import AES
  160. packet = bytearray(16)
  161. packet[0] = 4
  162. response = self.send_packet(0x6a, packet)
  163. err = response[0x22] | (response[0x23] << 8)
  164. if err == 0:
  165. aes = AES.new(bytes(self.key), AES.MODE_CBC, bytes(self.ivr))
  166. payload = aes.decrypt(bytes(response[0x38:]))
  167. return payload[0x04:]
  168.  
  169.  
  170. def main():
  171. os.system('clear')
  172.  
  173. device = Broadlink.Device(host=(ip_addr, 80),mac=binascii.unhexlify(mac_addr.encode().replace(b':', b'')))
  174. device.auth()
  175.  
  176. print("\n\n Learning mode... Please press the key on the remote control.")
  177. device.enter_learning()
  178.  
  179. while True:
  180. packet=device.check_data()
  181. if packet:
  182. print("\n\n Your packet is: \n\n {} \n\n".format(base64.b64encode(packet).decode('utf8')))
  183. break
  184.  
  185. if __name__ == "__main__":
  186. main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement