Guest User

Untitled

a guest
Jul 19th, 2018
78
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.24 KB | None | 0 0
  1. async def transfer(self, item: RestoreItem, semaphore):
  2. raw_path = ""
  3. async with semaphore:
  4. try:
  5. print("processing: " + item.source)
  6. local_path = await self._download(item.source)
  7. await self._upload(local_path, item.destination)
  8. print("done: " + item.source)
  9. except Exception as err:
  10. print("failed: " + item.source)
  11. print("error: " + str(err))
  12. stack_trace = traceback.format_exc()
  13. await self._log_error_notes(item.source, err, stack_trace)
  14. finally:
  15. if raw_path and os.path.exists(raw_path):
  16. os.remove(raw_path)
  17.  
  18. async def _download(self, s3_path):
  19. print('downloading: ', s3_path)
  20. bucket, key = get_s3_bucket_and_key(s3_path)
  21. raw_path = os.path.join(self._raw_dir, key)
  22. raw_dir = os.path.dirname(raw_path)
  23. if not os.path.exists(raw_dir):
  24. os.makedirs(raw_dir)
  25.  
  26. response = await self._s3.get_object(Bucket=bucket, Key=key)
  27. async with response["Body"] as stream:
  28. with io.FileIO(raw_path, "w") as file:
  29. data = await stream.read()
  30. while data:
  31. file.write(data)
  32. data = await stream.read()
  33.  
  34. return raw_path
Add Comment
Please, Sign In to add comment