Advertisement
Guest User

switchbot pikvm

a guest
Apr 16th, 2024
113
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.28 KB | None | 0 0
  1. from typing import Dict
  2. from typing import AsyncGenerator
  3. from typing import Optional
  4. from typing import Any
  5.  
  6. import gpiod
  7.  
  8. from ...logging import get_logger
  9.  
  10. from ... import aiotools
  11. from ... import aiogp
  12.  
  13. from ...yamlconf import Option
  14.  
  15. from ...validators.net import valid_mac
  16. from ...validators.basic import valid_number
  17. from ...validators.basic import valid_stripped_string
  18.  
  19. from . import AtxIsBusyError
  20. from . import BaseAtx
  21.  
  22. from switchbotpy import Bot
  23.  
  24.  
  25. def valid_hold_time(arg: Any) -> int:
  26.     return int(valid_number(arg, min=0, max=60))
  27.  
  28.  
  29. # =====
  30. class Plugin(BaseAtx):  # pylint: disable=too-many-instance-attributes
  31.     def __init__(  # pylint: disable=too-many-arguments,super-init-not-called
  32.         self,
  33.         device_mac: str,
  34.         click_delay: int,
  35.         long_click_delay: int,
  36.     ) -> None:
  37.  
  38.         self.__device_mac = device_mac
  39.         self.__click_delay = click_delay
  40.         self.__long_click_delay = long_click_delay
  41.  
  42.         self.__notifier = aiotools.AioNotifier()
  43.         self.__region = aiotools.AioExclusiveRegion(AtxIsBusyError, self.__notifier)
  44.         self.__bot = None
  45.  
  46.  
  47.     @classmethod
  48.     def get_plugin_options(cls) -> Dict:
  49.         return {
  50.             "device_mac":       Option("BLUETOOTH_ADDR", type=valid_stripped_string),
  51.             "click_delay":      Option(0, type=valid_hold_time),
  52.             "long_click_delay": Option(6, type=valid_hold_time),
  53.         }
  54.  
  55.     def sysprep(self) -> None:
  56.         assert self.__bot is None
  57.  
  58.         self.__bot = Bot(bot_id=0, mac=self.__device_mac, name="bot0")
  59.  
  60.  
  61.     async def get_state(self) -> Dict:
  62.         return {
  63.             "enabled": True,
  64.             "busy": False,
  65.             "leds": {
  66.                 "power": True,
  67.                 "hdd": False,
  68.             },
  69.         }
  70.  
  71.     async def poll_state(self) -> AsyncGenerator[Dict, None]:
  72.         while True:
  73.             yield (await self.get_state())
  74.             await aiotools.wait_infinite()
  75.  
  76.  
  77.     # =====
  78.  
  79.     async def power_on(self, wait: bool) -> None:
  80.         await self.click_power(wait)
  81.  
  82.     async def power_off(self, wait: bool) -> None:
  83.         await self.click_power(wait)
  84.  
  85.     async def power_off_hard(self, wait: bool) -> None:
  86.         await self.click_power_long(wait)
  87.  
  88.     async def power_reset_hard(self, wait: bool) -> None:
  89.         await self.click_power_long(wait)
  90.  
  91.  
  92.     # =====
  93.  
  94.     async def click_power(self, wait: bool) -> None:
  95.         await self.__click("power", self.__click_delay, wait)
  96.  
  97.     async def click_power_long(self, wait: bool) -> None:
  98.         await self.__click("power_long", self.__long_click_delay, wait)
  99.  
  100.  
  101.     # =====
  102.  
  103.     @aiotools.atomic
  104.     async def __click(self, name: str, delay: int, wait: bool) -> None:
  105.         if wait:
  106.             async with self.__region:
  107.                 await self.__inner_click(name, delay)
  108.         else:
  109.             await aiotools.run_region_task(
  110.                 f"Can't perform ATX {name} click or operation was not completed",
  111.                 self.__region, self.__inner_click, name, delay,
  112.             )
  113.  
  114.     @aiotools.atomic
  115.     async def __inner_click(self, name: str, delay: int) -> None:
  116.         self.__bot.set_hold_time(delay)
  117.         self.__bot.press()
  118.         get_logger(0).info("Clicked ATX button %r", name)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement