Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- async def transfer(self, item: RestoreItem, semaphore):
- raw_path = ""
- async with semaphore:
- try:
- print("processing: " + item.source)
- local_path = await self._download(item.source)
- await self._upload(local_path, item.destination)
- print("done: " + item.source)
- except Exception as err:
- print("failed: " + item.source)
- print("error: " + str(err))
- stack_trace = traceback.format_exc()
- await self._log_error_notes(item.source, err, stack_trace)
- finally:
- if raw_path and os.path.exists(raw_path):
- os.remove(raw_path)
- async def _download(self, s3_path):
- print('downloading: ', s3_path)
- bucket, key = get_s3_bucket_and_key(s3_path)
- raw_path = os.path.join(self._raw_dir, key)
- raw_dir = os.path.dirname(raw_path)
- if not os.path.exists(raw_dir):
- os.makedirs(raw_dir)
- response = await self._s3.get_object(Bucket=bucket, Key=key)
- async with response["Body"] as stream:
- with io.FileIO(raw_path, "w") as file:
- data = await stream.read()
- while data:
- file.write(data)
- data = await stream.read()
- return raw_path
Add Comment
Please, Sign In to add comment