Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import hs_restclient_tony as hsapi
- import multiprocessing as mp
- import os, shutil
- import getpass
- import time
- import sys
- import pickle
- from datetime import datetime
- import argparse
- req_version = (3,5)
- cur_version = sys.version_info
- if cur_version < req_version:
- print('\n' + '-'*50)
- print('ERROR!')
- print('This script requires Python version 3.5.x or higher.\nYou attempted to execute using Python %s' % ('.'.join([str(v) for v in [cur_version.major, cur_version.minor, cur_version.micro]])))
- print('-'*50)
- sys.exit(1)
- # _-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-
- # constants
- ts = time.strftime("%Y%m%d-%H%M%S")
- outname = 'broken-bagit-%s.tsv' % ts
- tout = 300
- # global var, assigned later
- hs = None
- skip_failed = 0
- cache = None
- def get_resource(q, iolock, out_q, cnt):
- current = mp.current_process()
- while True:
- resid = q.get()
- if resid is None:
- break
- with iolock:
- cnt.value += 1
- # check the status of the resource IF it exists in the cache
- if resid in cache.keys():
- failed = cache[resid]['status']
- # only process resources if they aren't in the cache OR they had a failed status
- if resid not in cache.keys() or (failed and not skip_failed):
- print('[%s] %d - Processing - %s' % (current.name, cnt.value, resid), flush=True)
- try:
- stream = hs.getResource(resid, destination=None, unzip=True, bag_timeout=int(tout))
- cache[resid] = {"message":'success', "status":1}
- except Exception as e:
- cache[resid] = {"message":e, "status":0}
- msg = get_resource_meta(resid)
- msg += '\t%s' % e
- out_q.put(msg)
- else:
- print('[%s] %d - Skipping - %s' % (current.name, cnt.value, resid), flush=True)
- def get_resource_meta(resid):
- # get the system metadata for error reporting
- meta = hs.getSystemMetadata(resid)
- return '\t'.join([str(v) for v in meta.values()])
- def writer(q):
- '''listens for messages on the q, writes to file. '''
- meta = hs.getSystemMetadata('0359cab7d93f403ba6ee8726cff74f8a')
- header = '\t'.join([k for k in meta.keys()])
- f = open(outname, 'w')
- f.write('%s\t%s\n' % (header, 'error-message'))
- while 1:
- m = q.get()
- if m == 'kill':
- break
- f.write(m + '\n')
- f.flush()
- f.close()
- def test_resource_download(u, p, h, s):
- global hs
- global skip_failed
- global cache
- skip_failed = s
- auth = hsapi.HydroShareAuthBasic(username=u, password=p)
- hs = hsapi.HydroShare(hostname=h, auth=auth, verbose=True)
- print('--> preparing multiprocessing environment... ', end='')
- NCORE = mp.cpu_count()
- in_q = mp.Queue(maxsize=NCORE)
- out_q = mp.Queue()
- cnt = mp.Value('i', 0)
- iolock = mp.Lock()
- pool = mp.Pool(NCORE, initializer=get_resource, initargs=(in_q, iolock, out_q, cnt))
- print('done')
- # load the cache
- print('--> loading cache... ', end='')
- if os.path.exists('cache.pkl'):
- cache = pickle.load(open('cache.pkl', 'rb'))
- print('done')
- days_old = abs((datetime.today() - cache['timestamp']).days)
- if (days_old != 0):
- res = input('--> cache is %d days old, do you want to remove it [Y/n]?' % days_old)
- if res != 'n':
- print('--> removing old cache')
- pickle.dump({'timestamp':datetime.today()}, open('cache.pkl', 'wb'))
- else:
- pickle.dump({'timestamp':datetime.today()}, open('cache.pkl', 'wb'))
- print('done')
- print('--> getting resource list... ', end='')
- resources = hs.getResourceList()
- print('done')
- print('--> begin iterating HydroShare resources')
- for resource in resources:
- resid = resource['resource_id']
- in_q.put(resid) # blocks until q below its max size
- for _ in range(NCORE): # tell workers we're done
- in_q.put(None)
- pool.close()
- pool.join()
- out_q.put('kill')
- # save the result back to cache
- print('saving the cache... ', end='')
- pickle.dump(cache, open('cache.pkl', 'wb'))
- print('done')
- # print all failed resources
- print('FAILED resource ids')
- cache = pickle.load(open('cache.pkl', 'rb'))
- for k,v in cache.items():
- if k != 'timestamp':
- if v['status'] == 0:
- print('%s: %s' % (k, v['message']))
- if __name__ == "__main__":
- parser = argparse.ArgumentParser(description="Tests if HydroShare resources can be downloaded")
- parser.add_argument('-u', '--username', required=1, help='HS username')
- parser.add_argument('-p', '--password', required=1, help='HS password')
- parser.add_argument('-H', '--host', required=1, help='host address, e.g. www.hydroshare.org')
- parser.add_argument('-s', '--skip-failed', required=0, help='skip failed resources in cache', default=0)
- args = parser.parse_args()
- test_resource_download(args.username, args.password, args.host, args.skip_failed)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement