Guest User

Untitled

a guest
Dec 13th, 2018
91
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.47 KB | None | 0 0
  1. from __future__ import unicode_literals
  2.  
  3. import argparse
  4. import csv
  5. import errno
  6. import logging
  7. import multiprocessing
  8. import os
  9. import shutil
  10. import time
  11. import traceback
  12. import sys
  13. import time
  14.  
  15. from PIL import Image
  16.  
  17. import requests
  18. import six
  19.  
  20. CONTAINER="image_Storage"
  21.  
  22. import swiftclient
  23.  
  24.  
  25.  
  26. def config_logger():
  27. logger = logging.getLogger('download')
  28. logger.setLevel(logging.DEBUG)
  29.  
  30. ch = logging.StreamHandler()
  31. ch.setLevel(logging.DEBUG)
  32.  
  33. formatter = logging.Formatter('%(process)d @ %(asctime)s (%(relativeCreated)d) '
  34. '%(name)s - %(levelname)s - %(message)s')
  35. ch.setFormatter(formatter)
  36. logger.addHandler(ch)
  37.  
  38. return logger
  39.  
  40.  
  41. def parse_args():
  42. parser = argparse.ArgumentParser(description='Download Google open image dataset.')
  43.  
  44. parser.add_argument('--timeout', type=float, default=2.0,
  45. help='image download timeout')
  46. parser.add_argument('--queue-size', type=int, default=1000,
  47. help='maximum image url queue size')
  48. parser.add_argument('--consumers', type=int, default=1,
  49. help='number of download workers')
  50. parser.add_argument('--min-dim', type=int, default=256,
  51. help='smallest dimension for the aspect ratio preserving scale'
  52. '(-1 for no scale)')
  53. parser.add_argument('--sub-dirs', type=int, default=1000,
  54. help='number of directories to split downloads over')
  55. parser.add_argument('--force', default=False, action='store_true',
  56. help='force download and overwrite local files')
  57.  
  58. parser.add_argument('input', help='open image input csv')
  59.  
  60. return parser.parse_args()
  61.  
  62.  
  63. def unicode_dict_reader(f, **kwargs):
  64. csv_reader = csv.DictReader(f, **kwargs)
  65. for row in csv_reader:
  66. yield {key: value for key, value in six.iteritems(row)}
  67.  
  68.  
  69.  
  70. def read_image(response, min_dim):
  71. """ Download response in chunks and convert to a scaled Image object """
  72.  
  73. content = six.BytesIO()
  74. shutil.copyfileobj(response.raw, content)
  75. content.seek(0)
  76.  
  77. return content
  78.  
  79.  
  80. def consumer(args, queue):
  81. """ Whilst the queue has images, download and save them """
  82.  
  83.  
  84. swift = swiftclient.client.Connection(
  85. auth_version=OS_IDENTITY_API_VERSION,
  86. user=OS_USERNAME,
  87. key=OS_PASSWORD,
  88. authurl=OS_AUTH_URL,
  89. tenant_name=OS_TENANT_NAME,
  90. os_options={'region_name':OS_REGION_NAME}
  91. )
  92.  
  93. while queue.empty():
  94. time.sleep(0.5) # give the queue a chance to populate
  95.  
  96. while not queue.empty():
  97. code, url = queue.get(block=True, timeout=None)
  98. #t0 = time.time()
  99. #c0 = time.clock()
  100. try:
  101. response = requests.get(url, stream=True, timeout=args.timeout)
  102. image = read_image(response, args.min_dim)
  103. #print('Time to read image', time.time()-t0)
  104. #print('Time to read image', time.clock()-c0)
  105. #image.save(out_path)
  106. #image.save('images/'+code+'.jpg')
  107. #swift.put_object('image_Storage', 'images/'+code+'.jpg', open('images/'+code+'.jpg'))
  108. #t0 = time.time()
  109. swift.put_object('image_Storage', 'imageTest/'+code+'.jpg', image)
  110. #print('Time to upload on swift:', time.clock()-c0)
  111. #log.debug('saving {} to {}'.format(url, code+'.jpg'))
  112. #os.remove('images/'+code+'.jpg')
  113. except Exception:
  114. log.warning('error {}'.format(traceback.format_exc()))
  115.  
  116. if queue.empty:
  117. time.sleep(0.1)
  118.  
  119. log.debug('!!!!!!!!!!!!! \n EXITING COSUMER \n !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
  120.  
  121. def producer(args, queue):
  122. """ Populate the queue with image_id, url pairs. """
  123.  
  124. with open(args.input) as f:
  125. for row in unicode_dict_reader(f):
  126. queue.put([row['ImageID'], row['OriginalURL']], block=True, timeout=None)
  127. #log.debug('queue_size = {}'.format(queue.qsize()))
  128.  
  129. queue.close()
  130.  
  131.  
  132. log = config_logger()
  133.  
  134.  
  135. if __name__ == '__main__':
  136. args = parse_args()
  137. log.debug(args)
  138.  
  139. #queue = multiprocessing.Queue(args.queue_size)
  140. queue = multiprocessing.Queue(10000)
  141. processes = [
  142. multiprocessing.Process(target=producer, args=(args, queue))
  143. ]
  144.  
  145. for i in range(args.consumers):
  146. processes.append(multiprocessing.Process(target=consumer, args=(args, queue)))
  147. print('There is :', len(processes), 'processes' )
  148.  
  149. for p in processes:
  150. p.start()
  151.  
  152. for p in processes:
  153. p.join()
Add Comment
Please, Sign In to add comment