Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- from concurrent.futures import ThreadPoolExecutor
- import sys
- from playwright.sync_api import BrowserContext, sync_playwright
- from playwright.sync_api._generated import ConsoleMessage
- class BaseBrowser:
- DEFAULT_CONTEXT = 'Desktop Safari'
- def __init__(
- self,
- /,
- device_name: str | None = None,
- headless: bool = True,
- ) -> None:
- self._device_name = device_name or self.DEFAULT_CONTEXT
- self._headless = headless
- def start(self) -> BrowserContext:
- self._playwright = sync_playwright().start()
- self._browser = self._playwright.webkit.launch(
- headless=self._headless,
- )
- device = self._playwright.devices[self._device_name]
- self._context = self._browser.new_context(**device)
- return self._context
- def close(self) -> None:
- self._context.close()
- self._browser.close()
- self._playwright.stop()
- self._set_asyncio_policy(asyncio.WindowsSelectorEventLoopPolicy())
- def _set_asyncio_policy(
- self,
- policy: asyncio.WindowsSelectorEventLoopPolicy | asyncio.DefaultEventLoopPolicy | None
- ) -> None:
- if sys.version_info >= (3, 8) and sys.platform.lower().startswith('win'):
- asyncio.set_event_loop_policy(policy)
- class Hsw(BaseBrowser):
- def __init__(
- self,
- token: str
- ) -> None:
- super().__init__()
- self._token = token
- async def get(self) -> str:
- with ThreadPoolExecutor() as pool:
- self._set_asyncio_policy(asyncio.DefaultEventLoopPolicy())
- loop = asyncio.get_running_loop()
- token = await loop.run_in_executor(pool, self._get_hsw, 'https://newassets.hcaptcha.com/c/2458d9b/hsw.js')
- return token
- def _on_token(self, message: ConsoleMessage) -> None:
- self._hsw_token = message.text
- def _get_hsw(self, url: str) -> str:
- self._hsw_token = None
- context = self.start()
- page = context.new_page()
- page.on('console', self._on_token)
- page.add_script_tag(url=url)
- # how to get function result?
- page.evaluate('hsw("%s").then(res => console.log(res))' % self._token)
- self.close()
- return self._hsw_token
- async def main():
- token = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJmIjowLCJzIjoyLCJ0IjoidyIsImQiOiJuZVZwYWZrT0FRNnhNNUJCV3Vwa01HdWM0ZFVxWGVRS0YwY0xiRlA5L2MrTVo0WlkxaWlvT3hvY3lqRFE4RGJYT2R6MGFsVmFmeEhENVBIWHUyYWFjUHBNUXRIL0t1RUEvNndDRVVGNWhIQnJEMUNuNUZnd2JwdkdOUVJ1WkdXVXZrTlVFRnJhS0lnaW9uOTg1bHl2eW1jQmU4SmlES25oc3MrcklsdkVGU2lPSEpJQ1JpNVo3Mzd3RzM4eDlKZC9adGNBTngyaEdFYy91THpSaUE4OUNPdmU2RVdXTG4rTWNTTmxwVVE4WHVDTmt4akFwWmlodTJzN1B3L3daUzNRIiwibCI6Imh0dHBzOi8vbmV3YXNzZXRzLmhjYXB0Y2hhLmNvbS9jLzNhODRjMTUiLCJpIjoic2hhMjU2LXV4L0ZDV2M4ZS9vWTN1cGFINnRVeWpQN2hMbGhwR295OG45ZVdWdVdNS009IiwiZSI6MTcwNTMyOTA3MywibiI6ImhzdyIsImMiOjEwMDB9.lZsCbqodE16TzXJclh4oYIVHmJH12zV23BvzJEzFo8c'
- hsw = Hsw(token)
- hsw_token = await hsw.get()
- print(hsw_token)
- if __name__ == '__main__':
- if sys.version_info >= (3, 8) and sys.platform.lower().startswith('win'):
- asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
- asyncio.run(main())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement