Advertisement
Guest User

Untitled

a guest
Aug 25th, 2016
55
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 4.34 KB | None | 0 0
  1. # Script for pulling MediaExpress 360 data from S3 and loading to Redshift
  2. from __future__ import print_function
  3.  
  4. from os import environ
  5.  
  6. from libs.s3 import S3
  7. from libs.database import Database
  8. from libs.tools import copy_s3_object
  9.  
  10.  
  11. connection = {'host': environ['REDSHIFT_HOST'],
  12.               'user': environ['REDSHIFT_USER'],
  13.               'database': environ['REDSHIFT_DB'],
  14.               'port': environ['REDSHIFT_PORT']}
  15.  
  16.  
  17. class Mex360(object):
  18.     def __init__(self):
  19.         self.database = Database(connection)
  20.         self.table_name = 'public.mex_360'
  21.         self.source_bucket = 'tr-agency-reporting-data'
  22.         self.source_prefix = 'video_360_usage/'
  23.         self.archive_bucket = 'rtv-mex-360'
  24.         self.columns = ['account_id', 'account_name', 'account_country',
  25.                         'account_region', 'user_first_name', 'user_last_name',
  26.                         'user_country', 'user_region', 'delivery_method',
  27.                         'usn', 'version', 'media_type', 'user_action',
  28.                         'event_time', 'slug', 'headline']
  29.         self.delimiter = '\001'
  30.  
  31.         self.source_files = S3().list_bucket(self.source_bucket,
  32.                                              prefix=self.source_prefix)
  33.  
  34.         self.archive_files = [f.key for f in
  35.                               S3().list_bucket(self.archive_bucket)]
  36.  
  37.         self.new_files = [f for f in self.source_files
  38.                           if f.key not in self.archive_files]
  39.  
  40.     def key_to_uri(self, key):
  41.         """Returns a URI string from a Boto Key object.
  42.        """
  43.         return 's3://{}/{}'.format(key.bucket.name, key.key)
  44.  
  45.     def introspect(self, all_time=False):
  46.         """Prints a JSON string containing attribute names and maximum string
  47.           length of that attribute for all CSVs in source data.
  48.        """
  49.         import csv
  50.         import json
  51.         from cStringIO import StringIO
  52.  
  53.         f_ob = StringIO()
  54.         f_string = ''
  55.  
  56.         if all_time:
  57.             files = self.source_files
  58.         else:
  59.             files = self.new_files
  60.  
  61.         for f in files:
  62.             f_string = f_string + S3().get_key_string(f.bucket.name, f.key)
  63.  
  64.         f_ob.write(f_string)
  65.         f_ob.seek(0)
  66.  
  67.         reader = csv.DictReader(f_ob, self.columns, delimiter=self.delimiter)
  68.  
  69.         lengths = {}
  70.  
  71.         i = 0
  72.         for row in reader:
  73.             for col in self.columns:
  74.                 try:
  75.                     l = len(row[col])
  76.                     try:
  77.                         old_l = lengths[col]
  78.                         if l > old_l:
  79.                             lengths[col] = l
  80.                     except KeyError:
  81.                         lengths[col] = l
  82.                 except KeyError:
  83.                     print('Key error:', col)
  84.                     print(row)
  85.             i += 1
  86.  
  87.         print(json.dumps(lengths, indent=4))
  88.         print('Rows:', i)
  89.  
  90.     def add_file_to_archive(self, key):
  91.         """Copies a file from source_bucket to archive_bucket.
  92.           Expects Boto Key.
  93.        """
  94.         copy_s3_object(key.bucket.name, key.key, self.archive_bucket, key.key)
  95.  
  96.     def load(self, all_files=False):
  97.         if all_files:
  98.             files = self.source_files
  99.             self.database.copy_s3_file_to_table(self.table_name,
  100.                                                 self.source_bucket,
  101.                                                 self.source_prefix,
  102.                                                 column_list=','.join(self.columns),
  103.                                                 seperator=self.delimiter,
  104.                                                 errors=0)
  105.             for f in files:
  106.                 self.add_file_to_archive(f)
  107.         else:
  108.             files = self.new_files
  109.             for f in files:
  110.                 self.database.copy_s3_file_to_table(self.table_name,
  111.                                                     self.source_bucket,
  112.                                                     f.key,
  113.                                                     column_list=','.join(self.columns),
  114.                                                     seperator=self.delimiter,
  115.                                                     errors=0)
  116.                 self.add_file_to_archive(f)
  117.  
  118. if __name__ == '__main__':
  119.     mex = Mex360()
  120.     mex.load(all_files=False)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement