Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from typing import Dict
- from typing import AsyncGenerator
- from typing import Optional
- from typing import Any
- import gpiod
- from ...logging import get_logger
- from ... import aiotools
- from ... import aiogp
- from ...yamlconf import Option
- from ...validators.net import valid_mac
- from ...validators.basic import valid_number
- from ...validators.basic import valid_stripped_string
- from . import AtxIsBusyError
- from . import BaseAtx
- from switchbotpy import Bot
- def valid_hold_time(arg: Any) -> int:
- return int(valid_number(arg, min=0, max=60))
- # =====
- class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
- def __init__( # pylint: disable=too-many-arguments,super-init-not-called
- self,
- device_mac: str,
- click_delay: int,
- long_click_delay: int,
- ) -> None:
- self.__device_mac = device_mac
- self.__click_delay = click_delay
- self.__long_click_delay = long_click_delay
- self.__notifier = aiotools.AioNotifier()
- self.__region = aiotools.AioExclusiveRegion(AtxIsBusyError, self.__notifier)
- self.__bot = None
- @classmethod
- def get_plugin_options(cls) -> Dict:
- return {
- "device_mac": Option("BLUETOOTH_ADDR", type=valid_stripped_string),
- "click_delay": Option(0, type=valid_hold_time),
- "long_click_delay": Option(6, type=valid_hold_time),
- }
- def sysprep(self) -> None:
- assert self.__bot is None
- self.__bot = Bot(bot_id=0, mac=self.__device_mac, name="bot0")
- async def get_state(self) -> Dict:
- return {
- "enabled": True,
- "busy": False,
- "leds": {
- "power": True,
- "hdd": False,
- },
- }
- async def poll_state(self) -> AsyncGenerator[Dict, None]:
- while True:
- yield (await self.get_state())
- await aiotools.wait_infinite()
- # =====
- async def power_on(self, wait: bool) -> None:
- await self.click_power(wait)
- async def power_off(self, wait: bool) -> None:
- await self.click_power(wait)
- async def power_off_hard(self, wait: bool) -> None:
- await self.click_power_long(wait)
- async def power_reset_hard(self, wait: bool) -> None:
- await self.click_power_long(wait)
- # =====
- async def click_power(self, wait: bool) -> None:
- await self.__click("power", self.__click_delay, wait)
- async def click_power_long(self, wait: bool) -> None:
- await self.__click("power_long", self.__long_click_delay, wait)
- # =====
- @aiotools.atomic
- async def __click(self, name: str, delay: int, wait: bool) -> None:
- if wait:
- async with self.__region:
- await self.__inner_click(name, delay)
- else:
- await aiotools.run_region_task(
- f"Can't perform ATX {name} click or operation was not completed",
- self.__region, self.__inner_click, name, delay,
- )
- @aiotools.atomic
- async def __inner_click(self, name: str, delay: int) -> None:
- self.__bot.set_hold_time(delay)
- self.__bot.press()
- get_logger(0).info("Clicked ATX button %r", name)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement