Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # Script for pulling MediaExpress 360 data from S3 and loading to Redshift
- from __future__ import print_function
- from os import environ
- from libs.s3 import S3
- from libs.database import Database
- from libs.tools import copy_s3_object
- connection = {'host': environ['REDSHIFT_HOST'],
- 'user': environ['REDSHIFT_USER'],
- 'database': environ['REDSHIFT_DB'],
- 'port': environ['REDSHIFT_PORT']}
- class Mex360(object):
- def __init__(self):
- self.database = Database(connection)
- self.table_name = 'public.mex_360'
- self.source_bucket = 'tr-agency-reporting-data'
- self.source_prefix = 'video_360_usage/'
- self.archive_bucket = 'rtv-mex-360'
- self.columns = ['account_id', 'account_name', 'account_country',
- 'account_region', 'user_first_name', 'user_last_name',
- 'user_country', 'user_region', 'delivery_method',
- 'usn', 'version', 'media_type', 'user_action',
- 'event_time', 'slug', 'headline']
- self.delimiter = '\001'
- self.source_files = S3().list_bucket(self.source_bucket,
- prefix=self.source_prefix)
- self.archive_files = [f.key for f in
- S3().list_bucket(self.archive_bucket)]
- self.new_files = [f for f in self.source_files
- if f.key not in self.archive_files]
- def key_to_uri(self, key):
- """Returns a URI string from a Boto Key object.
- """
- return 's3://{}/{}'.format(key.bucket.name, key.key)
- def introspect(self, all_time=False):
- """Prints a JSON string containing attribute names and maximum string
- length of that attribute for all CSVs in source data.
- """
- import csv
- import json
- from cStringIO import StringIO
- f_ob = StringIO()
- f_string = ''
- if all_time:
- files = self.source_files
- else:
- files = self.new_files
- for f in files:
- f_string = f_string + S3().get_key_string(f.bucket.name, f.key)
- f_ob.write(f_string)
- f_ob.seek(0)
- reader = csv.DictReader(f_ob, self.columns, delimiter=self.delimiter)
- lengths = {}
- i = 0
- for row in reader:
- for col in self.columns:
- try:
- l = len(row[col])
- try:
- old_l = lengths[col]
- if l > old_l:
- lengths[col] = l
- except KeyError:
- lengths[col] = l
- except KeyError:
- print('Key error:', col)
- print(row)
- i += 1
- print(json.dumps(lengths, indent=4))
- print('Rows:', i)
- def add_file_to_archive(self, key):
- """Copies a file from source_bucket to archive_bucket.
- Expects Boto Key.
- """
- copy_s3_object(key.bucket.name, key.key, self.archive_bucket, key.key)
- def load(self, all_files=False):
- if all_files:
- files = self.source_files
- self.database.copy_s3_file_to_table(self.table_name,
- self.source_bucket,
- self.source_prefix,
- column_list=','.join(self.columns),
- seperator=self.delimiter,
- errors=0)
- for f in files:
- self.add_file_to_archive(f)
- else:
- files = self.new_files
- for f in files:
- self.database.copy_s3_file_to_table(self.table_name,
- self.source_bucket,
- f.key,
- column_list=','.join(self.columns),
- seperator=self.delimiter,
- errors=0)
- self.add_file_to_archive(f)
- if __name__ == '__main__':
- mex = Mex360()
- mex.load(all_files=False)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement