Advertisement
Guest User

Untitled

a guest
Oct 13th, 2015
132
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.37 KB | None | 0 0
  1. '''
  2. This script performs efficient concatenation of files stored in S3. Given a
  3. folder, output location, and optional suffix, all files with the given suffix
  4. will be concatenated into one file stored in the output location.
  5.  
  6. Concatenation is performed within S3 when possible, falling back to local
  7. operations when necessary.
  8.  
  9. Run `python combineS3Files.py -h` for more info.
  10. '''
  11.  
  12. import boto3
  13. import os
  14. import threading
  15. import argparse
  16. import logging
  17.  
  18. # Script expects everything to happen in one bucket
  19. BUCKET = "parquet.data.logs.staging.sandbox"
  20. # S3 multi-part upload parts must be larger than 5mb
  21. MIN_S3_SIZE = 6000000
  22. # Setup logger to display timestamp
  23. logging.basicConfig(format='%(asctime)s => %(message)s')
  24.  
  25. def run_concatenation(folder_to_concatenate, result_filepath, file_suffix):
  26. s3 = new_s3_client()
  27. parts_list = collect_parts(s3, folder_to_concatenate, file_suffix)
  28. if len(parts_list) > 1:
  29. # perform multi-part upload
  30. upload_id = initiate_concatenation(s3, result_filepath)
  31. parts_mapping = assemble_parts_to_concatenate(s3, result_filepath, upload_id, parts_list)
  32. complete_concatenation(s3, result_filepath, upload_id, parts_mapping)
  33. elif len(parts_list) == 1:
  34. # can perform a simple S3 copy since there is just a single file
  35. resp = s3.copy_object(Bucket=BUCKET, CopySource="{}/{}".format(BUCKET, parts_list[0][0]), Key=result_filepath)
  36. logging.warning("Copied single file in {} to {} and got response {}".format(folder_to_concatenate, result_filepath, resp))
  37. else:
  38. logging.warning("No files to concatenate in {} for {}".format(folder_to_concatenate, result_filepath))
  39. pass
  40.  
  41. def new_s3_client():
  42. # initialize an S3 client with a private session so that multithreading
  43. # doesn't cause issues with the client's internal state
  44. session = boto3.session.Session()
  45. return session.client('s3')
  46.  
  47. def collect_parts(s3, folder, suffix):
  48. return filter(lambda x: x[0].endswith(suffix), _list_all_objects_with_size(s3, folder))
  49.  
  50. def _list_all_objects_with_size(s3, folder):
  51.  
  52. def resp_to_filelist(resp):
  53. return [(x['Key'], x['Size']) for x in resp['Contents']]
  54.  
  55. objects_list = []
  56. resp = s3.list_objects(Bucket=BUCKET, Prefix=folder)
  57. objects_list.extend(resp_to_filelist(resp))
  58. while resp['IsTruncated']:
  59. # if there are more entries than can be returned in one request, the key
  60. # of the last entry returned acts as a pagination value for the next request
  61. logging.warning("Found {} objects so far".format(len(objects_list)))
  62. last_key = objects_list[-1][0]
  63. resp = s3.list_objects(Bucket=BUCKET, Prefix=folder, Marker=last_key)
  64. objects_list.extend(resp_to_filelist(resp))
  65.  
  66. return objects_list
  67.  
  68. def initiate_concatenation(s3, result_filename):
  69. # performing the concatenation in S3 requires creating a multi-part upload
  70. # and then referencing the S3 files we wish to concatenate as "parts" of that upload
  71. resp = s3.create_multipart_upload(Bucket=BUCKET, Key=result_filename)
  72. logging.warning("Initiated concatenation attempt for {}, and got response: {}".format(result_filename, resp))
  73. return resp['UploadId']
  74.  
  75. def assemble_parts_to_concatenate(s3, result_filename, upload_id, parts_list):
  76. parts_mapping = []
  77. part_num = 0
  78.  
  79. s3_parts = ["{}/{}".format(BUCKET, p[0]) for p in parts_list if p[1] > MIN_S3_SIZE]
  80. local_parts = [p[0] for p in parts_list if p[1] <= MIN_S3_SIZE]
  81.  
  82. # assemble parts large enough for direct S3 copy
  83. for part_num, source_part in enumerate(s3_parts, 1): # part numbers are 1 indexed
  84. resp = s3.upload_part_copy(Bucket=BUCKET,
  85. Key=result_filename,
  86. PartNumber=part_num,
  87. UploadId=upload_id,
  88. CopySource=source_part)
  89. logging.warning("Setup S3 part #{}, with path: {}, and got response: {}".format(part_num, source_part, resp))
  90. parts_mapping.append({'ETag': resp['CopyPartResult']['ETag'][1:-1], 'PartNumber': part_num})
  91.  
  92. # assemble parts too small for direct S3 copy by downloading them locally,
  93. # combining them, and then reuploading them as the last part of the
  94. # multi-part upload (which is not constrained to the 5mb limit)
  95. small_parts = []
  96. for source_part in local_parts:
  97. temp_filename = "/tmp/{}".format(source_part.replace("/","_"))
  98. s3.download_file(Bucket=BUCKET, Key=source_part, Filename=temp_filename)
  99.  
  100. with open(temp_filename, 'rb') as f:
  101. small_parts.append(f.read())
  102. os.remove(temp_filename)
  103. logging.warning("Downloaded and copied small part with path: {}".format(source_part))
  104.  
  105. if len(small_parts) > 0:
  106. last_part_num = part_num + 1
  107. last_part = ''.join(small_parts)
  108. resp = s3.upload_part(Bucket=BUCKET, Key=result_filename, PartNumber=last_part_num, UploadId=upload_id, Body=last_part)
  109. logging.warning("Setup local part #{} from {} small files, and got response: {}".format(last_part_num, len(small_parts), resp))
  110. parts_mapping.append({'ETag': resp['ETag'][1:-1], 'PartNumber': last_part_num})
  111.  
  112. return parts_mapping
  113.  
  114. def complete_concatenation(s3, result_filename, upload_id, parts_mapping):
  115. if len(parts_mapping) == 0:
  116. resp = s3.abort_multipart_upload(Bucket=BUCKET, Key=result_filename, UploadId=upload_id)
  117. logging.warning("Aborted concatenation for file {}, with upload id #{} due to empty parts mapping".format(result_filename, upload_id))
  118. else:
  119. resp = s3.complete_multipart_upload(Bucket=BUCKET, Key=result_filename, UploadId=upload_id, MultipartUpload={'Parts': parts_mapping})
  120. logging.warning("Finished concatenation for file {}, with upload id #{}, and parts mapping: {}".format(result_filename, upload_id, parts_mapping))
  121.  
  122.  
  123. if __name__ == "__main__":
  124. parser = argparse.ArgumentParser(description="S3 file combiner")
  125. parser.add_argument("--folder", help="folder whose contents should be combined")
  126. parser.add_argument("--output", help="output location for resulting merged file, relative to the base bucket: {}".format(BUCKET))
  127. parser.add_argument("--suffix", help="suffix of files to include in the combination")
  128.  
  129. args = parser.parse_args()
  130.  
  131. logging.warning("Combining files in {}/{} to {}/{}".format(BUCKET, args.folder, BUCKET, args.output))
  132. run_concatenation(args.folder, args.output, args.suffix)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement