Guest User

Untitled

a guest
Oct 18th, 2017
73
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.39 KB | None | 0 0
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on Mon Oct 02 17:28:39 2017
  4.  
  5. @author: Nathan Knox
  6. """
  7. import glob
  8. import hashlib
  9. import os.path
  10. import re
  11. import subprocess
  12.  
  13. import pandas as pd
  14.  
  15.  
  16. class IIDRGlobCleaner:
  17. def hivetype_convert(self, srctypes, srcname):
  18. # srctypes is a list of source datatypes, srcname is the source name
  19. hivetypes = srctypes
  20. if srcname == "db2" or "teradata":
  21. for i in range(len(srctypes)):
  22. varchar_rep = re.sub("CHAR", "VARCHAR", srctypes[i])
  23. int_rep = re.sub("SMALLINT|INTEGER|BYTEINT",
  24. "INT", varchar_rep)
  25. hivetypes[i] = int_rep
  26. elif srcname == "oracle":
  27. for i in range(len(srctypes)):
  28. varchar_rep = re.sub("CHAR|NCHAR|VARCHAR2|NVARCHAR2",
  29. "VARCHAR", srctypes[i])
  30. int_rep = re.sub("SMALLINT|INTEGER|BYTEINT",
  31. "INT", varchar_rep)
  32. numeric_rep = re.sub("NUMERIC", "BIG INT", int_rep)
  33. hivetypes[i] = numeric_rep
  34. return hivetypes
  35.  
  36. def get_from_inbound(self, source, target):
  37. if os.path.isdir(target):
  38. print
  39. "Target directory exists, proceeding..."
  40. else:
  41. os.makedirs(target)
  42. print
  43. "Target directory created, proceeding..."
  44. # Remove contents of target folder
  45. print
  46. "Removing contents of %s..." % target
  47. subprocess.Popen(["rm", target + '*.R*'], stdin=subprocess.PIPE)
  48. get = subprocess.Popen(["hadoop", "fs", "-copyToLocal", source + '*', target],
  49. stdin=subprocess.PIPE, bufsize=-1)
  50. get.stdin.close()
  51. print "%s moved to %s." % (source, target)
  52.  
  53. def move_to_stage(self, source, target):
  54. put = subprocess.Popen(["hadoop", "fs", "-put", source, target],
  55. stdin=subprocess.PIPE, bufsize=-1)
  56. put.stdin.close()
  57. print "%s moved to %s." % (source, target)
  58.  
  59. def compute_SHA_hash(self, string):
  60. m = hashlib.sha1()
  61. m.update(string)
  62. return m.hexdigest()
  63.  
  64. def hash_cols(self, row, inputcols):
  65. colsinrow = list(row[inputcols])
  66. stringed = [str(i) for i in colsinrow]
  67. concat_stringed = '~'.join(stringed)
  68. return self.compute_SHA_hash(concat_stringed)
  69.  
  70. def pk_hash(self, dataframe, cols_to_hash):
  71. hashed_cols = []
  72. for i in range(len(dataframe.index)):
  73. postcols = ["(POST)" + x for x in cols_to_hash]
  74. temprow = dataframe.iloc[i, :]
  75. if temprow[2] == 'I':
  76. # Insert logic
  77. hashed_cols.append(self.hash_cols(temprow, postcols))
  78. elif temprow[2] == 'U' or 'D':
  79. # Update/Delete logic
  80. hashed_cols.append(self.hash_cols(temprow, cols_to_hash))
  81. else:
  82. print
  83. "Invalid operator"
  84. break
  85. # Insert hashed column as the 1st column (before IIDR meta)
  86. dataframe.insert(loc=0, column='PKHASH_KEY', value=hashed_cols)
  87.  
  88. def run(self):
  89. counter = 1
  90.  
  91. # Get source files to edgnode directory
  92. # get_from_inbound(hdfsinput + '*',path)
  93. # read in meta file
  94. meta = pd.read_csv(self.metafilein, sep='|')
  95.  
  96. # slice columns
  97. cols = list(meta.iloc[:, 0])
  98. types = list(meta.iloc[:, 1])
  99. pk = list(meta.iloc[:, 2])
  100. ind = list(meta.iloc[:, 3])
  101. comment = list(meta.iloc[:, 4])
  102.  
  103. iidr_cols = ["LOAD_DTM", "LOAD_INGSTN_ID", "TRNSCTN_CD", "GUID"]
  104. # list for IIDR data frame column names
  105. for i in range(len(cols)):
  106. iidr_cols.append(cols[i])
  107. for i in range(len(cols)):
  108. iidr_cols.append("(POST)" + cols[i])
  109.  
  110. # overwrite types to hive types
  111. meta.iloc[:, 1] = self.hivetype_convert(types, 'db2')
  112. # data parsing
  113.  
  114. if os.path.isfile(self.csvout):
  115. print
  116. "File already exists, removing..."
  117. os.remove(self.csvout)
  118. for f in glob.glob(self.path + '*.D20*'):
  119. print (f)
  120. for chunk in pd.read_csv(f, sep='|', header=None, chunksize=1000):
  121. fname = f.split('/')[-1].split('.')[0]
  122. print fname
  123. dbnum = fname[2:4]
  124. # imsdb = 'GNCH%sDB' % (dbnum) -- Applicable for GNC databases only
  125. # imsdb = 'GNCH%sDB' % (dbnum)
  126. imsdb = 'ICLH%sDB' % (dbnum)
  127. # imsdb = 'GNCDDCDB'
  128. print imsdb
  129. load_log_key = [] # IIDR file name, i.e. "DB01ACMP.D2017270.T132543871"
  130. sor_cd = [] # database name, i.e. "WGS"
  131. database_nm = [] # IMS only, i.e. "DB01ACMP"
  132. chunk.columns = iidr_cols
  133. # PK generation section
  134. # List comprehension to pull pk hash column names
  135. pk_hash_cols = [cols[x] for x in range(len(cols)) if pk[x] == '<pk>']
  136. # Add dbname
  137. pk_hash_cols.append(imsdb)
  138. # Iterate, create pk hash, add column to df
  139. # For insert, hash off of pre-insert pks. For everything else, use post.
  140. self.pk_hash(chunk, pk_hash_cols)
  141. # Populate meta lists
  142. for i in range(len(chunk.index)):
  143. load_log_key.append(f.split('/')[-1])
  144. sor_cd.append('WGS')
  145. database_nm.append(imsdb)
  146. # Insert metadata
  147. chunk.insert(loc=1, column='LOAD_LOG_KEY', value=load_log_key)
  148. chunk.insert(loc=2, column='SOR_CD', value=sor_cd)
  149. chunk.insert(loc=3, column='DATABASE_NM', value=database_nm)
  150.  
  151. # print chunk.head()
  152. # add in the pre-image cut, and chunking
  153. # cut out preimage
  154. postseq = range(len(chunk.columns) // 2 + 4, len(chunk.columns))
  155. postlist = [0, 1, 2, 3, 4, 5, 6, 7] + postseq
  156. postimage = chunk.iloc[:, postlist]
  157. # write chunk to file
  158. postimage.to_csv(self.csvout, sep='|', header=None, index=False, mode='a')
  159. print
  160. "Chunk %d completed, proceeding..." % (counter)
  161. counter += 1
  162. finalfile = pd.read_csv(self.csvout, sep='|', header=None)
  163. print "Parsing complete.\nOutput shape: %s" % (str(finalfile.shape))
  164. # Move finished CSV to HDFS Stage folder
  165. self.move_to_stage(self.csvout, self.hdfsoutput)
  166.  
  167. def __init__(self):
  168. print("Instantiating")
  169. self.user_in = raw_input("What segment do you want to process? (gnchist, etc.)\n:")
  170. self.csvout = "/data/01/dv/app/ve2/bdf/rawz/phi/gbd/r000/etl/python/parsed_files/%s.csv" % self.user_in.upper()
  171. # csvout = '/home/af49982/GNCHIST.csv'
  172. self.path = "/data/01/dv/app/ve2/bdf/rawz/phi/gbd/r000/etl/python/files_to_parse/%s/" % self.user_in.lower()
  173. # path = '/home/af49982/gnchist/'
  174. self.metafilein = self.path + '%s_META.csv' % self.user_in.upper()
  175. self.hdfsinput = "/dv/hdfsdata/ve2/bdf/rawz/phi/no_gbd/r000/inbound/claims/wgs/%s/" % self.user_in.lower()
  176. self.hdfsoutput = "/dv/hdfsdata/ve2/bdf/rawz/phi/no_gbd/r000/stage/claims/wgs/%s/" % self.user_in.lower()
  177.  
  178.  
  179. if __name__ == '__main__':
  180. print("Starting Glob Cleaner")
  181. cleaner = IIDRGlobCleaner
  182. cleaner.run()
Add Comment
Please, Sign In to add comment