Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import asyncpg
- from asyncpg.pool import Pool
- from asyncpg.connection import Connection
- from constants import PG_USER, PG_PASSWORD, PG_HOST, PG_PORT
- loop = asyncio.get_event_loop()
- _pool: Pool = loop.run_until_complete(
- asyncpg.create_pool(
- host=PG_HOST,
- port=PG_PORT,
- database="database",
- user=PG_USER,
- password=PG_PASSWORD
- )
- )
- def connection(coro):
- async def wrapper(*args, **kwargs):
- async with _pool.acquire() as connect: # берём соединение из пула соединений
- async with connect.transaction(): # открываем для взятого соединения транзакцию
- return await coro(connect=connect, *args, **kwargs) #
- return wrapper
- @connection
- async def create_tables(connect: Connection):
- await connect.execute("CREATE TABLE IF NOT EXISTS phones (id SERIAL PRIMARY KEY, phone TEXT UNIQUE)")
- # ...
- @connection
- async def insert_phone(phone: str, connect: Connection):
- await connect.execute("INSERT INTO phones (phone) VALUES ($1)", phone)
- @connection
- async def check_connection(text_plz: str, connect: Connection):
- await connect.execute("INSERT INTO dich_plz (text_plz) VALUES ($1)", text_plz)
- @connection
- async def get_phone_record(phone: str, connect: Connection):
- return await connect.fetchrow("SELECT * FROM phones WHERE phone=$1", phone)
- async def close():
- await _pool.close()
- async def main():
- try:
- ...
- finally:
- await close()
- if __name__ == '__main__':
- try:
- loop.run_until_complete(main())
- finally:
- loop.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement