Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python3
- import asyncio
- import asyncpg
- from time import time
- try:
- import uvloop
- asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
- except:
- pass # it will just be slower, no harm done
- async def await_save(query, block, pool):
- async with pool.acquire() as conn:
- await conn.executemany(query, block)
- async def process_db(reader_params, reader_query, writer_params, writer_query, block_size=100, block_count=10):
- loop = asyncio.get_event_loop()
- total_count = 0
- reader = await asyncpg.connect(*reader_params)
- writers = await asyncpg.create_pool(*writer_params)
- pending = set()
- async with reader.transaction():
- cursor = await reader.cursor(reader_query)
- while 1:
- while len(pending) > block_count:
- completed, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
- total_count += len(completed)
- if total_count % 100 < len(completed):
- print(total_count * block_size, "rows")
- block = await cursor.fetch(block_size)
- if not block:
- break
- pending.add(loop.create_task(await_save(writer_query, block, writers)))
- await asyncio.wait(pending)
- params = {
- "reader_params": {host="1.1.1.1", port=..., database=..., user=..., password=...},
- "writer_params": {host="2.2.2.2", port=..., database=..., user=..., password=...},
- "reader_query": "select column_a, column_b from table where column_c = 42;",
- "writer_query": "insert into copied_table (col_a, col_b) values ($1, $2);",
- }
- loop = asyncio.get_event_loop()
- loop.run_until_complete(process_db(**params))
Add Comment
Please, Sign In to add comment