Advertisement
WoJ

Untitled

WoJ
Dec 5th, 2018
238
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 5.58 KB | None | 0 0
  1. """
  2. Support for Unifi AP direct access.
  3.  
  4. For more details about this platform, please refer to the documentation at
  5. https://home-assistant.io/components/device_tracker.unifi_direct/
  6. """
  7. import logging
  8. import json
  9. import re
  10.  
  11. import voluptuous as vol
  12.  
  13. import homeassistant.helpers.config_validation as cv
  14. from homeassistant.components.device_tracker import (
  15.     DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
  16. from homeassistant.const import (
  17.     CONF_HOST, CONF_PASSWORD, CONF_USERNAME,
  18.     CONF_PORT)
  19.  
  20. REQUIREMENTS = ['pexpect==4.6.0']
  21.  
  22. _LOGGER = logging.getLogger(__name__)
  23.  
  24. DEFAULT_SSH_PORT = 22
  25. UNIFI_COMMAND = 'mca-dump | tr -d "\n"'
  26. UNIFI_SSID_TABLE = "vap_table"
  27. UNIFI_CLIENT_TABLE = "sta_table"
  28.  
  29. PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
  30.     vol.Required(CONF_HOST): cv.string,
  31.     vol.Required(CONF_PASSWORD): cv.string,
  32.     vol.Required(CONF_USERNAME): cv.string,
  33.     vol.Optional(CONF_PORT, default=DEFAULT_SSH_PORT): cv.port
  34. })
  35.  
  36.  
  37. def get_scanner(hass, config):
  38.     """Validate the configuration and return a Unifi direct scanner."""
  39.     scanner = UnifiDeviceScanner(config[DOMAIN])
  40.     if not scanner.connected:
  41.         return False
  42.     return scanner
  43.  
  44.  
  45. class UnifiDeviceScanner(DeviceScanner):
  46.     """This class queries Unifi wireless access point."""
  47.  
  48.     def __init__(self, config):
  49.         """Initialize the scanner."""
  50.         self.host = config[CONF_HOST]
  51.         self.username = config[CONF_USERNAME]
  52.         self.password = config[CONF_PASSWORD]
  53.         self.port = config[CONF_PORT]
  54.         self.ssh = None
  55.         self.connected = False
  56.         self.last_results = {}
  57.         self._connect()
  58.  
  59.     def scan_devices(self):
  60.         """Scan for new devices and return a list with found device IDs."""
  61.         result = _response_to_json(self._get_update())
  62.         if result:
  63.             self.last_results = result
  64.         return self.last_results.keys()
  65.  
  66.     def get_device_name(self, device):
  67.         """Return the name of the given device or None if we don't know."""
  68.         hostname = next((
  69.             value.get('hostname') for key, value in self.last_results.items()
  70.             if key.upper() == device.upper()), None)
  71.         if hostname is not None:
  72.             hostname = str(hostname)
  73.         return hostname
  74.  
  75.     def _connect(self):
  76.         """Connect to the Unifi AP SSH server."""
  77.         from pexpect import pxssh, exceptions
  78.  
  79.         self.ssh = pxssh.pxssh()
  80.         try:
  81.             self.ssh.login(self.host, self.username,
  82.                            password=self.password, port=self.port)
  83.             self.connected = True
  84.         except exceptions.EOF:
  85.             _LOGGER.error("Connection refused. SSH enabled?")
  86.             self._disconnect()
  87.  
  88.     def _disconnect(self):
  89.         """Disconnect the current SSH connection."""
  90.         try:
  91.             self.ssh.logout()
  92.         except Exception:  # pylint: disable=broad-except
  93.             pass
  94.         finally:
  95.             self.ssh = None
  96.  
  97.         self.connected = False
  98.  
  99.     def _get_update(self):
  100.         from pexpect import pxssh, exceptions
  101.  
  102.         try:
  103.             if not self.connected:
  104.                 self._connect()
  105.             # If we still aren't connected at this point
  106.             # don't try to send anything to the AP.
  107.             if not self.connected:
  108.                 return None
  109.             self.ssh.sendline(UNIFI_COMMAND)
  110.             self.ssh.prompt()
  111.             return self.ssh.before
  112.         except pxssh.ExceptionPxssh as err:
  113.             _LOGGER.error("Unexpected SSH error: %s", str(err))
  114.             self._disconnect()
  115.             return None
  116.         except (AssertionError, exceptions.EOF) as err:
  117.             _LOGGER.error("Connection to AP unavailable: %s", str(err))
  118.             self._disconnect()
  119.             return None
  120.  
  121.  
  122. def _response_to_json(response):
  123.     try:
  124.         data = str(response)[31:-1].replace("\\", "")
  125.         # the deserialization is temporarly changed to account for problems with the JSON provided by the AP
  126.         # if ESSIDs contain quotes ("), these are not correctly escaped by the AP and the resulting JSON is wrong
  127.         # the idea is to remove them and hope for the best.
  128.         # Worst case the whole block is swallowed, whihc is better than a systematic crash
  129.         # I will remove this once the bug is fixed by Unifi, a bug report has been raised (by me)
  130.         while True:
  131.             # since there may be that the JSON is so weird that the whole block is swallowed, an empty response is
  132.             # provided just in case
  133.             if not data:
  134.                 json_response = ''
  135.                 _LOGGER.warning("the JSON is so weird that the deserialization failed despite trying -> empty response")
  136.                 break
  137.             else:
  138.                 try:
  139.                     json_response = json.loads(data)
  140.                 except json.decoder.JSONDecodeError as e:
  141.                     s = int(re.search(r"\.*char (\d+)", str(e)).group(1)) - 2
  142.                     _LOGGER.warning("incorrect character at position {s}, removing".format(s=s))
  143.                     data = data[:s] + data[(s + 1):]
  144.                 else:
  145.                     break
  146.         _LOGGER.debug(str(json_response))
  147.         ssid_table = json_response.get(UNIFI_SSID_TABLE)
  148.         active_clients = {}
  149.  
  150.         for ssid in ssid_table:
  151.             client_table = ssid.get(UNIFI_CLIENT_TABLE)
  152.             for client in client_table:
  153.                 active_clients[client.get("mac")] = client
  154.  
  155.         return active_clients
  156.     except ValueError:
  157.         _LOGGER.error("Failed to decode response from AP.")
  158.         return {}
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement