eparla774

network

Apr 2nd, 2021 (edited)
384
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 12.99 KB | None | 0 0
  1. """Info control for host."""
  2. from __future__ import annotations
  3.  
  4. import asyncio
  5. from ipaddress import IPv4Address, IPv4Interface, IPv6Address, IPv6Interface
  6. import logging
  7. from typing import List, Optional, Union
  8.  
  9. import attr
  10.  
  11. from ..const import ATTR_HOST_INTERNET
  12. from ..coresys import CoreSys, CoreSysAttributes
  13. from ..dbus.const import (
  14.     DBUS_NAME_NM_CONNECTION_ACTIVE_CHANGED,
  15.     ConnectionStateType,
  16.     DeviceType,
  17.     InterfaceMethod as NMInterfaceMethod,
  18.     WirelessMethodType,
  19. )
  20. from ..dbus.network.accesspoint import NetworkWirelessAP
  21. from ..dbus.network.connection import NetworkConnection
  22. from ..dbus.network.interface import NetworkInterface
  23. from ..dbus.payloads.generate import interface_update_payload
  24. from ..exceptions import (
  25.     DBusError,
  26.     DBusNotConnectedError,
  27.     DBusProgramError,
  28.     HostNetworkError,
  29.     HostNetworkNotFound,
  30.     HostNotSupportedError,
  31. )
  32. from .const import AuthMethod, InterfaceMethod, InterfaceType, WifiMode
  33.  
  34. _LOGGER: logging.Logger = logging.getLogger(__name__)
  35.  
  36.  
  37. class NetworkManager(CoreSysAttributes):
  38.     """Handle local network setup."""
  39.  
  40.     def __init__(self, coresys: CoreSys):
  41.         """Initialize system center handling."""
  42.         self.coresys: CoreSys = coresys
  43.         self._connectivity: Optional[bool] = None
  44.  
  45.     @property
  46.     def connectivity(self) -> Optional[bool]:
  47.         """Return true current connectivity state."""
  48.         return self._connectivity
  49.  
  50.     @connectivity.setter
  51.     def connectivity(self, state: bool) -> None:
  52.         """Set host connectivity state."""
  53.         if self._connectivity == state:
  54.             return
  55.         self._connectivity = state
  56.         self.sys_homeassistant.websocket.supervisor_update_event(
  57.             "network", {ATTR_HOST_INTERNET: state}
  58.         )
  59.  
  60.     @property
  61.     def interfaces(self) -> List[Interface]:
  62.         """Return a dictionary of active interfaces."""
  63.         interfaces: List[Interface] = []
  64.         for inet in self.sys_dbus.network.interfaces.values():
  65.             interfaces.append(Interface.from_dbus_interface(inet))
  66.  
  67.         return interfaces
  68.  
  69.     @property
  70.     def dns_servers(self) -> List[str]:
  71.         """Return a list of local DNS servers."""
  72.         # Read all local dns servers
  73.         servers: List[str] = []
  74.         for config in self.sys_dbus.network.dns.configuration:
  75.             if config.vpn or not config.nameservers:
  76.                 continue
  77.             servers.extend(config.nameservers)
  78.  
  79.         return list(dict.fromkeys(servers))
  80.  
  81.     async def check_connectivity(self):
  82.         """Check the internet connection.
  83.  
  84.        ConnectionState 4 == FULL (has internet)
  85.        https://developer.gnome.org/NetworkManager/stable/nm-dbus-types.html#NMConnectivityState
  86.        """
  87.         if not self.sys_dbus.network.connectivity_enabled:
  88.             return
  89.  
  90.         # Check connectivity
  91.         try:
  92.             state = await self.sys_dbus.network.check_connectivity()
  93.             self.connectivity = state[0] == 4
  94.         except DBusError as err:
  95.             _LOGGER.warning("Can't update connectivity information: %s", err)
  96.             self.connectivity = False
  97.  
  98.     def get(self, inet_name: str) -> Interface:
  99.         """Return interface from interface name."""
  100.         if inet_name not in self.sys_dbus.network.interfaces:
  101.             raise HostNetworkNotFound()
  102.  
  103.         return Interface.from_dbus_interface(
  104.             self.sys_dbus.network.interfaces[inet_name]
  105.         )
  106.  
  107.     async def update(self):
  108.         """Update properties over dbus."""
  109.         _LOGGER.info("Updating local network information")
  110.         try:
  111.             await self.sys_dbus.network.update()
  112.         except DBusError:
  113.             _LOGGER.warning("Can't update network information!")
  114.         except DBusNotConnectedError as err:
  115.             _LOGGER.error("No network D-Bus connection available")
  116.             raise HostNotSupportedError() from err
  117.  
  118.         await self.check_connectivity()
  119.  
  120.     async def apply_changes(self, interface: Interface) -> None:
  121.         """Apply Interface changes to host."""
  122.         inet = self.sys_dbus.network.interfaces.get(interface.name)
  123.  
  124.         # Update exist configuration
  125.         if (
  126.             inet
  127.             and inet.settings
  128.             and inet.settings.connection.interface_name == interface.name
  129.             and interface.enabled
  130.         ):
  131.             settings = interface_update_payload(
  132.                 interface,
  133.                 name=inet.settings.connection.id,
  134.                 uuid=inet.settings.connection.uuid,
  135.             )
  136.  
  137.             try:
  138.                 await inet.settings.update(settings)
  139.                 await self.sys_dbus.network.activate_connection(
  140.                     inet.settings.object_path, inet.object_path
  141.                 )
  142.             except DBusError as err:
  143.                 _LOGGER.error("Can't update config on %s: %s", interface.name, err)
  144.                 raise HostNetworkError() from err
  145.  
  146.         # Create new configuration and activate interface
  147.         elif inet and interface.enabled:
  148.             settings = interface_update_payload(interface)
  149.  
  150.             try:
  151.                 await self.sys_dbus.network.add_and_activate_connection(
  152.                     settings, inet.object_path
  153.                 )
  154.             except DBusError as err:
  155.                 _LOGGER.error(
  156.                     "Can't create config and activate %s: %s", interface.name, err
  157.                 )
  158.                 raise HostNetworkError() from err
  159.  
  160.         # Remove config from interface
  161.         elif inet and inet.settings and not interface.enabled:
  162.             try:
  163.                 await inet.settings.delete()
  164.             except DBusError as err:
  165.                 _LOGGER.error("Can't disable interface %s: %s", interface.name, err)
  166.                 raise HostNetworkError() from err
  167.  
  168.         # Create new interface (like vlan)
  169.         elif not inet:
  170.             settings = interface_update_payload(interface)
  171.  
  172.             try:
  173.                 await self.sys_dbus.network.settings.add_connection(settings)
  174.             except DBusError as err:
  175.                 _LOGGER.error("Can't create new interface: %s", err)
  176.                 raise HostNetworkError() from err
  177.         else:
  178.             _LOGGER.warning("Requested Network interface update is not possible")
  179.             raise HostNetworkError()
  180.  
  181.         await self.sys_dbus.network.dbus.wait_signal(
  182.             DBUS_NAME_NM_CONNECTION_ACTIVE_CHANGED
  183.         )
  184.         await self.update()
  185.  
  186.     async def scan_wifi(self, interface: Interface) -> List[AccessPoint]:
  187.         """Scan on Interface for AccessPoint."""
  188.         inet = self.sys_dbus.network.interfaces.get(interface.name)
  189.  
  190.         if inet.type != DeviceType.WIRELESS:
  191.             _LOGGER.error("Can only scan with wireless card - %s", interface.name)
  192.             raise HostNotSupportedError()
  193.  
  194.         # Request Scan
  195.         try:
  196.             await inet.wireless.request_scan()
  197.         except DBusProgramError as err:
  198.             _LOGGER.debug("Can't request a new scan: %s", err)
  199.         except DBusError as err:
  200.             raise HostNetworkError() from err
  201.         else:
  202.             await asyncio.sleep(5)
  203.  
  204.         # Process AP
  205.         accesspoints: List[AccessPoint] = []
  206.         for ap_object in (await inet.wireless.get_all_accesspoints())[0]:
  207.             accesspoint = NetworkWirelessAP(ap_object)
  208.  
  209.             try:
  210.                 await accesspoint.connect()
  211.             except DBusError as err:
  212.                 _LOGGER.warning("Can't process an AP: %s", err)
  213.                 continue
  214.             else:
  215.                 accesspoints.append(
  216.                     AccessPoint(
  217.                         WifiMode[WirelessMethodType(accesspoint.mode).name],
  218.                         accesspoint.ssid,
  219.                         accesspoint.mac,
  220.                         accesspoint.frequency,
  221.                         accesspoint.strength,
  222.                     )
  223.                 )
  224.  
  225.         return accesspoints
  226.  
  227.  
  228. @attr.s(slots=True)
  229. class AccessPoint:
  230.     """Represent a wifi configuration."""
  231.  
  232.     mode: WifiMode = attr.ib()
  233.     ssid: str = attr.ib()
  234.     mac: str = attr.ib()
  235.     frequency: int = attr.ib()
  236.     signal: int = attr.ib()
  237.  
  238.  
  239. @attr.s(slots=True)
  240. class IpConfig:
  241.     """Represent a IP configuration."""
  242.  
  243.     method: InterfaceMethod = attr.ib()
  244.     address: List[Union[IPv4Interface, IPv6Interface]] = attr.ib()
  245.     gateway: Optional[Union[IPv4Address, IPv6Address]] = attr.ib()
  246.     nameservers: List[Union[IPv4Address, IPv6Address]] = attr.ib()
  247.  
  248.  
  249. @attr.s(slots=True)
  250. class WifiConfig:
  251.     """Represent a wifi configuration."""
  252.  
  253.     mode: WifiMode = attr.ib()
  254.     ssid: str = attr.ib()
  255.     auth: AuthMethod = attr.ib()
  256.     psk: Optional[str] = attr.ib()
  257.     signal: Optional[int] = attr.ib()
  258.  
  259.  
  260. @attr.s(slots=True)
  261. class VlanConfig:
  262.     """Represent a vlan configuration."""
  263.  
  264.     id: int = attr.ib()
  265.     interface: str = attr.ib()
  266.  
  267.  
  268. @attr.s(slots=True)
  269. class Interface:
  270.     """Represent a host network interface."""
  271.  
  272.     name: str = attr.ib()
  273.     enabled: bool = attr.ib()
  274.     connected: bool = attr.ib()
  275.     primary: bool = attr.ib()
  276.     type: InterfaceType = attr.ib()
  277.     ipv4: Optional[IpConfig] = attr.ib()
  278.     ipv6: Optional[IpConfig] = attr.ib()
  279.     wifi: Optional[WifiConfig] = attr.ib()
  280.     vlan: Optional[VlanConfig] = attr.ib()
  281.  
  282.     @staticmethod
  283.     def from_dbus_interface(inet: NetworkInterface) -> Interface:
  284.         """Concert a dbus interface into normal Interface."""
  285.         return Interface(
  286.             inet.name,
  287.             inet.settings is not None,
  288.             Interface._map_nm_connected(inet.connection),
  289.             inet.primary,
  290.             Interface._map_nm_type(inet.type),
  291.             IpConfig(
  292.                 Interface._map_nm_method(inet.settings.ipv4.method),
  293.                 inet.connection.ipv4.address,
  294.                 inet.connection.ipv4.gateway,
  295.                 inet.connection.ipv4.nameservers,
  296.             )
  297.             if inet.connection and inet.connection.ipv4
  298.             else IpConfig(InterfaceMethod.DISABLED, [], None, []),
  299.             IpConfig(
  300.                 Interface._map_nm_method(inet.settings.ipv6.method),
  301.                 inet.connection.ipv6.address,
  302.                 inet.connection.ipv6.gateway,
  303.                 inet.connection.ipv6.nameservers,
  304.             )
  305.             if inet.connection and inet.connection.ipv6
  306.             else IpConfig(InterfaceMethod.DISABLED, [], None, []),
  307.             Interface._map_nm_wifi(inet),
  308.             Interface._map_nm_vlan(inet),
  309.         )
  310.  
  311.     @staticmethod
  312.     def _map_nm_method(method: str) -> InterfaceMethod:
  313.         """Map IP interface method."""
  314.         mapping = {
  315.             NMInterfaceMethod.AUTO: InterfaceMethod.AUTO,
  316.             NMInterfaceMethod.DISABLED: InterfaceMethod.DISABLED,
  317.             NMInterfaceMethod.MANUAL: InterfaceMethod.STATIC,
  318.         }
  319.  
  320.         return mapping.get(method, InterfaceMethod.DISABLED)
  321.  
  322.     @staticmethod
  323.     def _map_nm_connected(connection: Optional[NetworkConnection]) -> bool:
  324.         """Map connectivity state."""
  325.         if not connection:
  326.             return False
  327.  
  328.         return connection.state in (
  329.             ConnectionStateType.ACTIVATED,
  330.             ConnectionStateType.ACTIVATING,
  331.         )
  332.  
  333.     @staticmethod
  334.     def _map_nm_type(device_type: int) -> InterfaceType:
  335.         mapping = {
  336.             DeviceType.ETHERNET: InterfaceType.ETHERNET,
  337.             DeviceType.WIRELESS: InterfaceType.WIRELESS,
  338.             DeviceType.VLAN: InterfaceType.VLAN,
  339.         }
  340.         return mapping[device_type]
  341.  
  342.     @staticmethod
  343.     def _map_nm_wifi(inet: NetworkInterface) -> Optional[WifiConfig]:
  344.         """Create mapping to nm wifi property."""
  345.         if inet.type != DeviceType.WIRELESS or not inet.settings:
  346.             return None
  347.  
  348.         # Authentication
  349.         auth = None
  350.         if not inet.settings.wireless_security:
  351.             auth = AuthMethod.OPEN
  352.         elif inet.settings.wireless_security.key_mgmt == "none":
  353.                 auth = AuthMethod.WEP
  354.         elif inet.settings.wireless_security.key_mgmt == "wpa-psk":
  355.                 auth = AuthMethod.WPA_PSK
  356.  
  357.         # WifiMode
  358.         mode = WifiMode.INFRASTRUCTURE
  359.         if inet.settings.wireless.mode:
  360.             mode = WifiMode(inet.settings.wireless.mode)
  361.  
  362.         # Signal
  363.         if inet.wireless:
  364.             signal = inet.wireless.active.strength
  365.         else:
  366.             signal = None
  367.  
  368.         if not inet.settings.wireless_security:
  369.             passwd = None
  370.         else:
  371.             passwd = inet.settings.wireless_security.psk
  372.  
  373.         return WifiConfig(
  374.             mode,
  375.             inet.settings.wireless.ssid,
  376.             auth,
  377.             passwd,
  378.             signal,
  379.         )
  380.  
  381.     @staticmethod
  382.     def _map_nm_vlan(inet: NetworkInterface) -> Optional[WifiConfig]:
  383.         """Create mapping to nm vlan property."""
  384.         if inet.type != DeviceType.VLAN or not inet.settings:
  385.             return None
  386.  
  387.         return VlanConfig(inet.settings.vlan.id, inet.settings.vlan.parent)
Add Comment
Please, Sign In to add comment