Guest User

Untitled

a guest
Nov 10th, 2017
99
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.49 KB | None | 0 0
  1. #!/usr/bin/python3
  2. import asyncio
  3. import asyncpg
  4. from time import time
  5. try:
  6. import uvloop
  7. asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
  8. except:
  9. pass # it will just be slower, no harm done
  10.  
  11. async def await_save(query, block, pool):
  12. async with pool.acquire() as conn:
  13. await conn.executemany(query, block)
  14.  
  15. async def process_db(reader_params, reader_query, writer_params, writer_query, block_size=100, block_count=10):
  16. loop = asyncio.get_event_loop()
  17. total_count = 0
  18. reader = await asyncpg.connect(*reader_params)
  19. writers = await asyncpg.create_pool(*writer_params)
  20. pending = set()
  21. async with reader.transaction():
  22. cursor = await reader.cursor(reader_query)
  23. while 1:
  24. while len(pending) > block_count:
  25. completed, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
  26. total_count += len(completed)
  27. if total_count % 100 < len(completed):
  28. print(total_count * block_size, "rows")
  29. block = await cursor.fetch(block_size)
  30. if not block:
  31. break
  32. pending.add(loop.create_task(await_save(writer_query, block, writers)))
  33. await asyncio.wait(pending)
  34.  
  35. params = {
  36. "reader_params": {host="1.1.1.1", port=..., database=..., user=..., password=...},
  37. "writer_params": {host="2.2.2.2", port=..., database=..., user=..., password=...},
  38. "reader_query": "select column_a, column_b from table where column_c = 42;",
  39. "writer_query": "insert into copied_table (col_a, col_b) values ($1, $2);",
  40. }
  41. loop = asyncio.get_event_loop()
  42. loop.run_until_complete(process_db(**params))
Add Comment
Please, Sign In to add comment