Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- """
- Created on Mon Oct 02 17:28:39 2017
- @author: Nathan Knox
- """
- import glob
- import hashlib
- import os.path
- import re
- import subprocess
- import pandas as pd
- class IIDRGlobCleaner:
- def hivetype_convert(self, srctypes, srcname):
- # srctypes is a list of source datatypes, srcname is the source name
- hivetypes = srctypes
- if srcname == "db2" or "teradata":
- for i in range(len(srctypes)):
- varchar_rep = re.sub("CHAR", "VARCHAR", srctypes[i])
- int_rep = re.sub("SMALLINT|INTEGER|BYTEINT",
- "INT", varchar_rep)
- hivetypes[i] = int_rep
- elif srcname == "oracle":
- for i in range(len(srctypes)):
- varchar_rep = re.sub("CHAR|NCHAR|VARCHAR2|NVARCHAR2",
- "VARCHAR", srctypes[i])
- int_rep = re.sub("SMALLINT|INTEGER|BYTEINT",
- "INT", varchar_rep)
- numeric_rep = re.sub("NUMERIC", "BIG INT", int_rep)
- hivetypes[i] = numeric_rep
- return hivetypes
- def get_from_inbound(self, source, target):
- if os.path.isdir(target):
- print
- "Target directory exists, proceeding..."
- else:
- os.makedirs(target)
- print
- "Target directory created, proceeding..."
- # Remove contents of target folder
- print
- "Removing contents of %s..." % target
- subprocess.Popen(["rm", target + '*.R*'], stdin=subprocess.PIPE)
- get = subprocess.Popen(["hadoop", "fs", "-copyToLocal", source + '*', target],
- stdin=subprocess.PIPE, bufsize=-1)
- get.stdin.close()
- print "%s moved to %s." % (source, target)
- def move_to_stage(self, source, target):
- put = subprocess.Popen(["hadoop", "fs", "-put", source, target],
- stdin=subprocess.PIPE, bufsize=-1)
- put.stdin.close()
- print "%s moved to %s." % (source, target)
- def compute_SHA_hash(self, string):
- m = hashlib.sha1()
- m.update(string)
- return m.hexdigest()
- def hash_cols(self, row, inputcols):
- colsinrow = list(row[inputcols])
- stringed = [str(i) for i in colsinrow]
- concat_stringed = '~'.join(stringed)
- return self.compute_SHA_hash(concat_stringed)
- def pk_hash(self, dataframe, cols_to_hash):
- hashed_cols = []
- for i in range(len(dataframe.index)):
- postcols = ["(POST)" + x for x in cols_to_hash]
- temprow = dataframe.iloc[i, :]
- if temprow[2] == 'I':
- # Insert logic
- hashed_cols.append(self.hash_cols(temprow, postcols))
- elif temprow[2] == 'U' or 'D':
- # Update/Delete logic
- hashed_cols.append(self.hash_cols(temprow, cols_to_hash))
- else:
- print
- "Invalid operator"
- break
- # Insert hashed column as the 1st column (before IIDR meta)
- dataframe.insert(loc=0, column='PKHASH_KEY', value=hashed_cols)
- def run(self):
- counter = 1
- # Get source files to edgnode directory
- # get_from_inbound(hdfsinput + '*',path)
- # read in meta file
- meta = pd.read_csv(self.metafilein, sep='|')
- # slice columns
- cols = list(meta.iloc[:, 0])
- types = list(meta.iloc[:, 1])
- pk = list(meta.iloc[:, 2])
- ind = list(meta.iloc[:, 3])
- comment = list(meta.iloc[:, 4])
- iidr_cols = ["LOAD_DTM", "LOAD_INGSTN_ID", "TRNSCTN_CD", "GUID"]
- # list for IIDR data frame column names
- for i in range(len(cols)):
- iidr_cols.append(cols[i])
- for i in range(len(cols)):
- iidr_cols.append("(POST)" + cols[i])
- # overwrite types to hive types
- meta.iloc[:, 1] = self.hivetype_convert(types, 'db2')
- # data parsing
- if os.path.isfile(self.csvout):
- print
- "File already exists, removing..."
- os.remove(self.csvout)
- for f in glob.glob(self.path + '*.D20*'):
- print (f)
- for chunk in pd.read_csv(f, sep='|', header=None, chunksize=1000):
- fname = f.split('/')[-1].split('.')[0]
- print fname
- dbnum = fname[2:4]
- # imsdb = 'GNCH%sDB' % (dbnum) -- Applicable for GNC databases only
- # imsdb = 'GNCH%sDB' % (dbnum)
- imsdb = 'ICLH%sDB' % (dbnum)
- # imsdb = 'GNCDDCDB'
- print imsdb
- load_log_key = [] # IIDR file name, i.e. "DB01ACMP.D2017270.T132543871"
- sor_cd = [] # database name, i.e. "WGS"
- database_nm = [] # IMS only, i.e. "DB01ACMP"
- chunk.columns = iidr_cols
- # PK generation section
- # List comprehension to pull pk hash column names
- pk_hash_cols = [cols[x] for x in range(len(cols)) if pk[x] == '<pk>']
- # Add dbname
- pk_hash_cols.append(imsdb)
- # Iterate, create pk hash, add column to df
- # For insert, hash off of pre-insert pks. For everything else, use post.
- self.pk_hash(chunk, pk_hash_cols)
- # Populate meta lists
- for i in range(len(chunk.index)):
- load_log_key.append(f.split('/')[-1])
- sor_cd.append('WGS')
- database_nm.append(imsdb)
- # Insert metadata
- chunk.insert(loc=1, column='LOAD_LOG_KEY', value=load_log_key)
- chunk.insert(loc=2, column='SOR_CD', value=sor_cd)
- chunk.insert(loc=3, column='DATABASE_NM', value=database_nm)
- # print chunk.head()
- # add in the pre-image cut, and chunking
- # cut out preimage
- postseq = range(len(chunk.columns) // 2 + 4, len(chunk.columns))
- postlist = [0, 1, 2, 3, 4, 5, 6, 7] + postseq
- postimage = chunk.iloc[:, postlist]
- # write chunk to file
- postimage.to_csv(self.csvout, sep='|', header=None, index=False, mode='a')
- print
- "Chunk %d completed, proceeding..." % (counter)
- counter += 1
- finalfile = pd.read_csv(self.csvout, sep='|', header=None)
- print "Parsing complete.\nOutput shape: %s" % (str(finalfile.shape))
- # Move finished CSV to HDFS Stage folder
- self.move_to_stage(self.csvout, self.hdfsoutput)
- def __init__(self):
- print("Instantiating")
- self.user_in = raw_input("What segment do you want to process? (gnchist, etc.)\n:")
- self.csvout = "/data/01/dv/app/ve2/bdf/rawz/phi/gbd/r000/etl/python/parsed_files/%s.csv" % self.user_in.upper()
- # csvout = '/home/af49982/GNCHIST.csv'
- self.path = "/data/01/dv/app/ve2/bdf/rawz/phi/gbd/r000/etl/python/files_to_parse/%s/" % self.user_in.lower()
- # path = '/home/af49982/gnchist/'
- self.metafilein = self.path + '%s_META.csv' % self.user_in.upper()
- self.hdfsinput = "/dv/hdfsdata/ve2/bdf/rawz/phi/no_gbd/r000/inbound/claims/wgs/%s/" % self.user_in.lower()
- self.hdfsoutput = "/dv/hdfsdata/ve2/bdf/rawz/phi/no_gbd/r000/stage/claims/wgs/%s/" % self.user_in.lower()
- if __name__ == '__main__':
- print("Starting Glob Cleaner")
- cleaner = IIDRGlobCleaner
- cleaner.run()
Add Comment
Please, Sign In to add comment