Advertisement
Guest User

Untitled

a guest
Aug 21st, 2017
102
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.16 KB | None | 0 0
  1. import hs_restclient_tony as hsapi
  2. import multiprocessing as mp
  3. import os, shutil
  4. import getpass
  5. import time
  6. import sys
  7. import pickle
  8. from datetime import datetime
  9. import argparse
  10.  
  11.  
  12. req_version = (3,5)
  13. cur_version = sys.version_info
  14. if cur_version < req_version:
  15. print('\n' + '-'*50)
  16. print('ERROR!')
  17. print('This script requires Python version 3.5.x or higher.\nYou attempted to execute using Python %s' % ('.'.join([str(v) for v in [cur_version.major, cur_version.minor, cur_version.micro]])))
  18. print('-'*50)
  19. sys.exit(1)
  20. # _-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-
  21.  
  22. # constants
  23. ts = time.strftime("%Y%m%d-%H%M%S")
  24. outname = 'broken-bagit-%s.tsv' % ts
  25. tout = 300
  26.  
  27. # global var, assigned later
  28. hs = None
  29. skip_failed = 0
  30. cache = None
  31.  
  32.  
  33. def get_resource(q, iolock, out_q, cnt):
  34. current = mp.current_process()
  35.  
  36. while True:
  37. resid = q.get()
  38. if resid is None:
  39. break
  40. with iolock:
  41. cnt.value += 1
  42.  
  43. # check the status of the resource IF it exists in the cache
  44. if resid in cache.keys():
  45. failed = cache[resid]['status']
  46.  
  47. # only process resources if they aren't in the cache OR they had a failed status
  48. if resid not in cache.keys() or (failed and not skip_failed):
  49.  
  50. print('[%s] %d - Processing - %s' % (current.name, cnt.value, resid), flush=True)
  51. try:
  52. stream = hs.getResource(resid, destination=None, unzip=True, bag_timeout=int(tout))
  53. cache[resid] = {"message":'success', "status":1}
  54. except Exception as e:
  55. cache[resid] = {"message":e, "status":0}
  56. msg = get_resource_meta(resid)
  57. msg += '\t%s' % e
  58. out_q.put(msg)
  59. else:
  60. print('[%s] %d - Skipping - %s' % (current.name, cnt.value, resid), flush=True)
  61.  
  62. def get_resource_meta(resid):
  63. # get the system metadata for error reporting
  64. meta = hs.getSystemMetadata(resid)
  65. return '\t'.join([str(v) for v in meta.values()])
  66.  
  67. def writer(q):
  68. '''listens for messages on the q, writes to file. '''
  69.  
  70. meta = hs.getSystemMetadata('0359cab7d93f403ba6ee8726cff74f8a')
  71. header = '\t'.join([k for k in meta.keys()])
  72.  
  73. f = open(outname, 'w')
  74. f.write('%s\t%s\n' % (header, 'error-message'))
  75.  
  76. while 1:
  77. m = q.get()
  78. if m == 'kill':
  79. break
  80. f.write(m + '\n')
  81. f.flush()
  82. f.close()
  83.  
  84. def test_resource_download(u, p, h, s):
  85. global hs
  86. global skip_failed
  87. global cache
  88.  
  89. skip_failed = s
  90. auth = hsapi.HydroShareAuthBasic(username=u, password=p)
  91. hs = hsapi.HydroShare(hostname=h, auth=auth, verbose=True)
  92.  
  93. print('--> preparing multiprocessing environment... ', end='')
  94. NCORE = mp.cpu_count()
  95. in_q = mp.Queue(maxsize=NCORE)
  96. out_q = mp.Queue()
  97. cnt = mp.Value('i', 0)
  98. iolock = mp.Lock()
  99. pool = mp.Pool(NCORE, initializer=get_resource, initargs=(in_q, iolock, out_q, cnt))
  100. print('done')
  101.  
  102. # load the cache
  103. print('--> loading cache... ', end='')
  104. if os.path.exists('cache.pkl'):
  105. cache = pickle.load(open('cache.pkl', 'rb'))
  106. print('done')
  107.  
  108. days_old = abs((datetime.today() - cache['timestamp']).days)
  109. if (days_old != 0):
  110. res = input('--> cache is %d days old, do you want to remove it [Y/n]?' % days_old)
  111. if res != 'n':
  112. print('--> removing old cache')
  113. pickle.dump({'timestamp':datetime.today()}, open('cache.pkl', 'wb'))
  114. else:
  115. pickle.dump({'timestamp':datetime.today()}, open('cache.pkl', 'wb'))
  116. print('done')
  117.  
  118.  
  119. print('--> getting resource list... ', end='')
  120. resources = hs.getResourceList()
  121. print('done')
  122.  
  123. print('--> begin iterating HydroShare resources')
  124. for resource in resources:
  125. resid = resource['resource_id']
  126. in_q.put(resid) # blocks until q below its max size
  127.  
  128. for _ in range(NCORE): # tell workers we're done
  129. in_q.put(None)
  130. pool.close()
  131. pool.join()
  132.  
  133. out_q.put('kill')
  134.  
  135. # save the result back to cache
  136. print('saving the cache... ', end='')
  137. pickle.dump(cache, open('cache.pkl', 'wb'))
  138. print('done')
  139.  
  140. # print all failed resources
  141. print('FAILED resource ids')
  142. cache = pickle.load(open('cache.pkl', 'rb'))
  143. for k,v in cache.items():
  144. if k != 'timestamp':
  145. if v['status'] == 0:
  146. print('%s: %s' % (k, v['message']))
  147.  
  148.  
  149. if __name__ == "__main__":
  150. parser = argparse.ArgumentParser(description="Tests if HydroShare resources can be downloaded")
  151. parser.add_argument('-u', '--username', required=1, help='HS username')
  152. parser.add_argument('-p', '--password', required=1, help='HS password')
  153. parser.add_argument('-H', '--host', required=1, help='host address, e.g. www.hydroshare.org')
  154. parser.add_argument('-s', '--skip-failed', required=0, help='skip failed resources in cache', default=0)
  155. args = parser.parse_args()
  156.  
  157. test_resource_download(args.username, args.password, args.host, args.skip_failed)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement