Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- import asyncio
- import asyncpg
- import os
- import sys
- import shutil
- import concurrent.futures
- import gzip
- import hashlib
- import shutil
- from datetime import datetime, timedelta
- USER='planet'
- DB='planet'
- HOST='planet.cbztihrnp0pu.us-west-2.rds.amazonaws.com'
- PASSWORD='planetplanet'
- SQL='''SELECT id, bucket_id, name, updated, extract(epoch FROM updated) AS uepoch, extended FROM scenes WHERE updated > $1 ORDER BY updated ASC limit 100;'''
- MNT='/mnt/efs/extended10'
- DAYS_TO_REPLAY=7
- def ensure_dir(f):
- d = os.path.dirname(f)
- if not os.path.exists(d):
- os.makedirs(d)
- return False
- return True
- ##TODO collision warning
- def fhash(name):
- m = hashlib.md5()
- m.update(str.encode(name))
- return m.hexdigest()
- def writed(path, extended):
- already_exists = ensure_dir(path)
- if not os.path.isfile(path):
- if already_exists:
- files = os.listdir(os.path.dirname(path))
- else:
- files = []
- f = open(path, 'wb')
- f.write(gzip.compress(str.encode(extended)))
- f.close()
- sys.stdout.write("$")
- sys.stdout.flush()
- for ext_file in files:
- os.remove('{}/{}'.format(os.path.dirname(path), ext_file))
- sys.stdout.write("-")
- sys.stdout.flush()
- def dump_file(value):
- name = value['name']
- bucket_id = str(value['bucket_id'])
- epoch = value['uepoch']
- updated = value['updated']
- extended = value['extended']
- # write blank extended if we are blank
- if extended == None or len(extended) < 1:
- extended = ""
- # TODO maybe add a local timer and hostname to prevent collisions
- md5_name = fhash(name)
- path= "{}/{}/{}/{}/{}/{}".format(MNT,bucket_id, md5_name[0], md5_name[1:3], name, epoch)
- writed(path, extended)
- return value['updated']
- def dump_to_steady_state(values):
- last_updated = 0
- with concurrent.futures.ThreadPoolExecutor() as e:
- return [ result for result in e.map(dump_file, values) ]
- async def run():
- values_to_discover = True
- last_updated = datetime.now() - timedelta(days=DAYS_TO_REPLAY)
- previous_updated = last_updated
- conn = None
- cur = None
- while True:
- try:
- fetch_of_cursor_size = 128
- conn = await asyncpg.connect(user=USER, password=PASSWORD, database=DB, host=HOST)
- async with conn.transaction():
- await conn.prepare('set transaction ISOLATION LEVEL READ UNCOMMITTED READ ONLY;')
- cur = await conn.cursor(SQL, last_updated)
- sys.stdout.write(str(last_updated))
- sys.stdout.flush()
- # fetch it the first time
- values = await cur.fetch(fetch_of_cursor_size)
- while len(values) > 1:
- previous_updated = last_updated
- for updated in dump_to_steady_state(values):
- if last_updated < updated:
- last_updated = updated
- values = await cur.fetch(fetch_of_cursor_size)
- sys.stdout.write("!")
- sys.stdout.flush()
- await conn.close()
- except Exception as e:
- if hasattr(conn, 'close'):
- await conn.close()
- print("error: {0}".format(e))
- import time
- time.sleep(2)
- #replay from worst case
- last_updated = previous_updated
- loop = asyncio.get_event_loop()
- loop.run_until_complete(run())
- loop.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement