Advertisement
Guest User

Untitled

a guest
Sep 22nd, 2016
85
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.46 KB | None | 0 0
  1. #!/usr/bin/env python3
  2. import asyncio
  3. import asyncpg
  4. import os
  5. import sys
  6. import shutil
  7. import concurrent.futures
  8. import gzip
  9. import hashlib
  10. import shutil
  11. from datetime import datetime, timedelta
  12.  
  13. USER='planet'
  14. DB='planet'
  15. HOST='planet.cbztihrnp0pu.us-west-2.rds.amazonaws.com'
  16. PASSWORD='planetplanet'
  17. SQL='''SELECT id, bucket_id, name, updated, extract(epoch FROM updated) AS uepoch, extended FROM scenes WHERE updated > $1 ORDER BY updated ASC limit 100;'''
  18.  
  19. MNT='/mnt/efs/extended10'
  20. DAYS_TO_REPLAY=7
  21.  
  22. def ensure_dir(f):
  23. d = os.path.dirname(f)
  24. if not os.path.exists(d):
  25. os.makedirs(d)
  26. return False
  27. return True
  28. ##TODO collision warning
  29.  
  30. def fhash(name):
  31. m = hashlib.md5()
  32. m.update(str.encode(name))
  33. return m.hexdigest()
  34.  
  35. def writed(path, extended):
  36. already_exists = ensure_dir(path)
  37. if not os.path.isfile(path):
  38. if already_exists:
  39. files = os.listdir(os.path.dirname(path))
  40. else:
  41. files = []
  42. f = open(path, 'wb')
  43. f.write(gzip.compress(str.encode(extended)))
  44. f.close()
  45. sys.stdout.write("$")
  46. sys.stdout.flush()
  47. for ext_file in files:
  48. os.remove('{}/{}'.format(os.path.dirname(path), ext_file))
  49. sys.stdout.write("-")
  50. sys.stdout.flush()
  51.  
  52. def dump_file(value):
  53. name = value['name']
  54. bucket_id = str(value['bucket_id'])
  55. epoch = value['uepoch']
  56. updated = value['updated']
  57. extended = value['extended']
  58. # write blank extended if we are blank
  59.  
  60. if extended == None or len(extended) < 1:
  61. extended = ""
  62.  
  63. # TODO maybe add a local timer and hostname to prevent collisions
  64. md5_name = fhash(name)
  65. path= "{}/{}/{}/{}/{}/{}".format(MNT,bucket_id, md5_name[0], md5_name[1:3], name, epoch)
  66. writed(path, extended)
  67. return value['updated']
  68.  
  69. def dump_to_steady_state(values):
  70. last_updated = 0
  71. with concurrent.futures.ThreadPoolExecutor() as e:
  72. return [ result for result in e.map(dump_file, values) ]
  73.  
  74. async def run():
  75. values_to_discover = True
  76. last_updated = datetime.now() - timedelta(days=DAYS_TO_REPLAY)
  77. previous_updated = last_updated
  78. conn = None
  79. cur = None
  80. while True:
  81. try:
  82. fetch_of_cursor_size = 128
  83. conn = await asyncpg.connect(user=USER, password=PASSWORD, database=DB, host=HOST)
  84. async with conn.transaction():
  85. await conn.prepare('set transaction ISOLATION LEVEL READ UNCOMMITTED READ ONLY;')
  86.  
  87. cur = await conn.cursor(SQL, last_updated)
  88. sys.stdout.write(str(last_updated))
  89. sys.stdout.flush()
  90. # fetch it the first time
  91. values = await cur.fetch(fetch_of_cursor_size)
  92. while len(values) > 1:
  93. previous_updated = last_updated
  94. for updated in dump_to_steady_state(values):
  95. if last_updated < updated:
  96. last_updated = updated
  97. values = await cur.fetch(fetch_of_cursor_size)
  98. sys.stdout.write("!")
  99. sys.stdout.flush()
  100.  
  101. await conn.close()
  102. except Exception as e:
  103. if hasattr(conn, 'close'):
  104. await conn.close()
  105. print("error: {0}".format(e))
  106. import time
  107. time.sleep(2)
  108. #replay from worst case
  109. last_updated = previous_updated
  110.  
  111.  
  112. loop = asyncio.get_event_loop()
  113. loop.run_until_complete(run())
  114. loop.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement