Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- from asyncio.events import AbstractEventLoop
- from typing import List, Callable, Optional, Any
- import asyncpg
- from asyncpg.pool import Pool
- from asyncpg.connection import Connection
- from .types import Table, Column, ProxyModel, PhoneModel
- from ..communicate.types import Phone
- from ..proxy.types import Proxy
- from ..helpers.time import get_utc_now_timestamp
- def _acquire_connection(method: Callable) -> Callable:
- async def wrapper(self: "Database", *args, **kwargs) -> Any:
- async with self._pool.acquire() as connect:
- async with connect.transaction():
- return await method(self, *args, **kwargs, __connect=connect)
- return wrapper
- class Database:
- def __init__(self, pool: Pool, loop: Optional[AbstractEventLoop] = None):
- if loop is None:
- loop = asyncio.get_event_loop()
- self.loop = loop
- self._pool = pool
- @classmethod
- async def init_from_auth(cls, host: str, port: int, database: str,
- user: str, password: str, loop: Optional[AbstractEventLoop] = None) -> "Database":
- pool = await asyncpg.create_pool(host=host, port=port, database=database, user=user,
- password=password, loop=loop)
- return cls(pool=pool, loop=loop)
- @_acquire_connection
- async def create_tables(self, __connect: Connection) -> None:
- await __connect.execute(
- f"""
- CREATE TABLE IF NOT EXISTS {Table.USED_PROXIES} (
- {Column.RECORD_ID} SERIAL PRIMARY KEY,
- {Column.PROXY} TEXT UNIQUE NOT NULL,
- {Column.ADD_UTC_TIMESTAMP} INTEGER NOT NULL
- )
- """
- )
- await __connect.execute(
- f"""
- CREATE TABLE IF NOT EXISTS {Table.USAGE_PHONES} (
- {Column.RECORD_ID} SERIAL PRIMARY KEY,
- {Column.PHONE} TEXT NOT NULL,
- {Column.ADD_UTC_TIMESTAMP} INTEGER NOT NULL
- )
- """
- )
- @_acquire_connection
- async def get_used_proxies(self, __connect: Connection) -> List[ProxyModel]:
- records = await __connect.fetch(
- f"""
- SELECT *
- FROM {Table.USED_PROXIES}
- """
- )
- return [ProxyModel.init_from_star_record(record=record) for record in records]
- @_acquire_connection
- async def get_phone_usage(self, phone: Phone, __connect: Connection) -> List[PhoneModel]:
- records = await __connect.fetch(
- f"""
- SELECT *
- FROM {Table.USAGE_PHONES}
- WHERE {Column.PHONE}=$1
- """, str(phone)
- )
- return [PhoneModel.init_from_star_record(record=record) for record in records]
- @_acquire_connection
- async def add_used_proxy(self, proxy: Proxy, __connect: Connection) -> None:
- await __connect.execute(
- f"""
- INSERT INTO {Table.USED_PROXIES} ({Column.PROXY}, {Column.ADD_UTC_TIMESTAMP})
- VALUES ($1, $2)
- """, str(proxy), get_utc_now_timestamp()
- )
- @_acquire_connection
- async def add_phone_usage(self, phone: Phone, __connect: Connection) -> None:
- await __connect.execute(
- f"""
- INSERT INTO {Table.USAGE_PHONES} ({Column.PHONE}, {Column.ADD_UTC_TIMESTAMP})
- VALUES ($1, $2)
- """, str(phone), get_utc_now_timestamp()
- )
- async def close(self):
- await self._pool.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement