Guest User

Untitled

a guest
Mar 31st, 2018
329
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 15.44 KB | None | 0 0
  1. #!/bin/env python
  2. #transfer teradata table to hive
  3. # - assumes bd user is the same as teradata user. note the LOAD INPATH reference to user
  4.  
  5. import sys
  6. import re
  7. import datetime
  8. import argparse
  9. from argparse import RawTextHelpFormatter
  10. import smtplib
  11. import getpass
  12. import socket,time
  13. DatalabName = "datalab"
  14. multidelimit=False
  15. FieldDelimiter = '^'
  16. if multidelimit:
  17. FieldDelimiter = '^~'
  18. e2hFolder='/data/e2h/'
  19. notify_email = True
  20.  
  21.  
  22. edwToHiveTypeMapping={
  23. "TIME":"STRING",
  24. "CHAR":"STRING",
  25. "VARCHAR":"STRING",
  26. "DATE":"DATE",
  27. "FLOAT":"DOUBLE",
  28. "INTEGER":"INT",
  29. "BYTEINT":"TINYINT",
  30. "SMALLINT":"SMALLINT",
  31. "BIGINT":"BIGINT",
  32. "NUMBER":"DECIMAL",
  33. "TIMESTAMP":"TIMESTAMP",
  34. "DECIMAL":"DECIMAL"
  35. }
  36.  
  37. import subprocess, shlex
  38. from pprint import pprint
  39.  
  40. def run_cmd(cmd):
  41. ''' runs the command and returns exit status '''
  42. p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True, executable='/bin/bash')
  43. output = p.communicate()[0]
  44. return output,p.returncode
  45.  
  46. fromaddr = getpass.getuser()+"@"+socket.gethostname()
  47. to = ['paolo.villaflores@team.com']
  48.  
  49. def alert(alert_text, alert_body):
  50.  
  51. msg = 'from:'+fromaddr+'\r\n'
  52. msg += 'Subject:'+alert_text+' at ' +str(datetime.now())[:16] +'\r\n'
  53. msg += '\r\n\r\n\r\n' + alert_body.replace('\n','\r\n')
  54. s = smtplib.SMTP()
  55. s.connect('localhost')
  56. s.sendmail(fromaddr, to , 'To: ' + ','.join(to) + '\r\n' + msg + '\r\nFinished\r\n\r\n' )
  57. s.close()
  58.  
  59.  
  60. parser = argparse.ArgumentParser(description="transfer edw table to hive", formatter_class=RawTextHelpFormatter)
  61. parser.add_argument('-e', "--edwtable", type=str, help='EDW table', required=True )
  62. parser.add_argument('-d', "--edwdb", type=str, help='EDW database', required=True )
  63. parser.add_argument('-b', "--bdtable", type=str, help='BD table', required=True )
  64. parser.add_argument('-l', "--logmech", type=str, help='logmech', required=True )
  65. parser.add_argument('-u', "--user", type=str, help='user', required=True )
  66. parser.add_argument('-p', "--password", type=str, help='password', required=True )
  67. parser.add_argument('-r', "--hiveUser", type=str, help='BD user' )
  68. parser.add_argument('-t', "--teraCreateUser", type=str, help='teradata create user' )
  69. parser.add_argument('-x', "--teraCreateUserLogMech", type=str, help='teradata create user logmech' )
  70. parser.add_argument('-v', "--teraCreateUserPassword", type=str, help='teradata create user pass' )
  71. parser.add_argument('-c', "--credentialsHivePassword", type=str, help='BD password' )
  72. parser.add_argument('-s', "--teradataserver", type=str, help='Teradata server', required=True )
  73. parser.add_argument('-w', "--where", type=str, help='where clause EDW' )
  74.  
  75. args = parser.parse_args()
  76.  
  77. hiveuser = args.user
  78. if args.hiveUser :
  79. hiveuser = args.hiveUser
  80.  
  81. teraCreateUser=args.user
  82. if args.teraCreateUser :
  83. teraCreateUser = args.teraCreateUser
  84.  
  85. Teradata_Server = args.teradataserver
  86. LogMech = ".LOGMECH "+args.logmech+"\n"
  87.  
  88. teraCreateUserLogMech = LogMech
  89. if args.teraCreateUser :
  90. teraCreateUserLogMech = ".LOGMECH "+args.teraCreateUserLogMech+"\n"
  91.  
  92. teraCreateUserPassword = args.password
  93. if args.teraCreateUser :
  94. teraCreateUserPassword = args.teraCreateUserPassword
  95.  
  96.  
  97. def okBteqErr(retval, o):
  98. #a = 'Read from remote host ' in o and ': Connection timed out' in o
  99. return ('*** Ok, Session ' in o and '*** Query completed. ' in o and 'One column returned.' in o)
  100.  
  101.  
  102. def ls_check(outfile):
  103.  
  104. cmd2 = "ls -l " + e2hFolder +outfile
  105. o,retval = run_cmd(cmd2)
  106.  
  107. if retval != 0:
  108. print 'retcode:',retval
  109. print o
  110. raise Exceoption, "Non-zero exit code on performing ls. Please review output."
  111.  
  112. cmd2 = "kinit -kt ~/user.kt -p "+hiveuser+" ;hdfs dfs -ls " +outfile
  113. o2,retval2 = run_cmd(cmd2)
  114.  
  115. if retval2 != 0:
  116. print 'retcode:',retval2
  117. print o2
  118. raise Exception, "Non-zero exit code on performing hdfs -ls. Please review output."
  119.  
  120. q = o.strip().split("\n")[-1]
  121. q2 = o2.strip().split("\n")[-1]
  122.  
  123. r = q.strip("\n").split(None)
  124. r2 = q2.strip("\n").split(None)
  125.  
  126. # check that the output of ls have same size
  127. if r[4] != r2[4]:
  128. raise Exception, "file sizes inconsistent after dfs put. ls size: " + r[4] + "hdfs size: "+r2[4]
  129.  
  130.  
  131.  
  132. cmdLine = """bteq << EOF
  133. .SESSIONS 4
  134. .SET ERROROUT STDOUT;
  135. """ + teraCreateUserLogMech + ".LOGON " + Teradata_Server + "/"+teraCreateUser+","+ teraCreateUserPassword+ """;
  136. .SET WIDTH 9999
  137.  
  138.  
  139. /*Setting format of output file*/
  140. .SET RECORDMODE OFF;
  141. .SET FORMAT OFF ;
  142. .SET TITLEDASHES OFF;
  143. .SET SEPARATOR ',';
  144. CREATE TABLE """+DatalabName+".Tmp"+args.bdtable+" AS ( select * from "+args.edwdb+"."+args.edwtable+""" sample 1 ) WITH NO DATA;
  145. .quit
  146. EOF"""
  147.  
  148. retries = 0
  149. while True:
  150. o,retval = run_cmd(cmdLine)
  151.  
  152. if '*** Error: Logon failed!' in o:
  153. # retry because this has been observed to fail even when login is correct.
  154. if retries < 5:
  155. retries += 1
  156. time.sleep(300)
  157. continue
  158. else:
  159. raise Exception, "Bteq Logon failed on create temp table: "+args.edwdb+"."+args.edwtable+"@" + Teradata_Server
  160.  
  161. if retval != 0 and not okBteqErr(retval, o):
  162. print o
  163. raise Exception, "Non-zero exit code on performing teradata bteq exec CREATE TABLE."
  164. break
  165.  
  166.  
  167.  
  168. #########################################
  169. # Now we do show table to get the SCHEMA
  170. #########################################
  171.  
  172.  
  173. cmdLine = """bteq << EOF
  174. .SESSIONS 4
  175. .SET ERROROUT STDOUT;
  176. """ + teraCreateUserLogMech + ".LOGON " + Teradata_Server + "/"+teraCreateUser+","+ teraCreateUserPassword+ """;
  177. .SET WIDTH 9999
  178.  
  179.  
  180. /*Setting format of output file*/
  181. .SET RECORDMODE OFF;
  182. .SET FORMAT OFF ;
  183. .SET TITLEDASHES OFF;
  184. .SET SEPARATOR ',';
  185. SHOW TABLE """+DatalabName+".Tmp"+args.bdtable+""";
  186. DROP TABLE """+DatalabName+".Tmp"+args.bdtable+""";
  187. .quit
  188. EOF"""
  189.  
  190. retries = 0
  191. while True:
  192. o,retval = run_cmd(cmdLine)
  193.  
  194. if '*** Error: Logon failed!' in o:
  195. # retry because this has been observed to fail even when login is correct.
  196. if retries < 5:
  197. retries += 1
  198. time.sleep(300)
  199. continue
  200. else:
  201. raise Exception, "Bteq Logon failed on show table: "+args.edwdb+"."+args.edwtable+"@" + Teradata_Server
  202.  
  203. if retval != 0 and not okBteqErr(retval, o):
  204. print o
  205. raise Exception, "Non-zero exit code on performing teradata bteq SHOW TABLE."
  206. break
  207.  
  208. # parse the show table output
  209. state = 0
  210. fields1=[]
  211. select_fields=""
  212. f_Str = ""
  213. Sel_Expr=""
  214. for i in o.split('\n'):
  215. if state == 0 and 'CREATE SET TABLE' in i:
  216. state = 1
  217. elif state == 1 and '(' == i.strip():
  218. state = 2
  219. elif state == 2:
  220. if 'PRIMARY ' in i or i.strip()[-1] == ';':
  221. break
  222. r = i.strip().split(None)
  223. ds = r[1].strip()
  224. if ds[-1] == ',':
  225. ds = ds[:-1]
  226. k1 = k2 = 0
  227. for i in ds:
  228. if i == '(':
  229. k1+=1
  230. if i == ')':
  231. k2+=1
  232. while k2>k1 and ds[-1] == ')':
  233. ds = ds[:-1]
  234. k2 -=1
  235.  
  236. fieldName = r[0]
  237. datatypeString = ds
  238. fieldType = r[1].split('(')[0].split(',')[0].strip(')')
  239.  
  240. HT = edwToHiveTypeMapping[ fieldType ]
  241.  
  242. if fieldType == "DECIMAL" :
  243. HT = datatypeString
  244.  
  245. if fieldType == "NUMBER" :
  246. HT = datatypeString.replace("NUMBER","DECIMAL")
  247.  
  248. if fieldType == "TIME" :
  249. # Hive does not have a good match.Only DATE and TIMESTAMP
  250. # sample value: 20:15:14
  251. # encountered in apollo.cdr DWPRDE_VWXP_APOLLO.CDR rtng_event_time field
  252. # so...
  253. HT = "STRING"
  254.  
  255. fields1.append([fieldName,fieldType,datatypeString,HT] )
  256.  
  257. # Build SELECT for BD CREATE
  258. select_fields += " `" + fieldName.lower() + "`" + " " + HT+",\n"
  259. if len(fieldName)>128 :
  260. raise Exception, "FieldName cannot be more than 128 characters long: "+fieldName
  261.  
  262. ##########################
  263. # Build EDW Sel_Expr
  264. ##########################
  265. if fieldType != "TIMESTAMP" and fieldType != "DATE" :
  266. f_Str += "\nTRIM("
  267.  
  268. # Handle string
  269. #fld = "'\"'||" + "COALESCE(" + 'FieldName' + ",'*NULL*')" + "||'\"'"
  270. #fld = "COALESCE(" + 'FieldName' + ",'*NULL*')"
  271. fld = "COALESCE(TRANSLATE(" + fieldName + " USING LATIN_TO_UNICODE WITH ERROR),'*NULL*')"
  272.  
  273.  
  274. if fieldType == "BIGINT" :
  275. f_Str = f_Str + "COALESCE(CAST(" + fieldName + " AS BIGINT),'?')"
  276. elif fieldType == "TIMESTAMP" :
  277. f_Str = f_Str + "COALESCE(CAST(CAST(" + fieldName + " as format 'YYYY-MM-DDbHH:MI:SS') AS VARCHAR(19)),'?')"
  278. elif fieldType == "DATE" :
  279. f_Str = f_Str + "COALESCE(CAST(CAST(" + fieldName + " as format 'YYYY-MM-DD') AS VARCHAR(10)),'?')"
  280. elif fieldType == "INTEGER" or fieldType == "BYTEINT" or fieldType == "SMALLINT" or fieldType == "FLOAT" or fieldType == "DECIMAL" or fieldType == "NUMBER" :
  281. f_Str = f_Str + "COALESCE(" + fieldName + ",'?')"
  282.  
  283. elif fieldType == "TIME" :
  284. # Hive does not have a good match.Only DATE and TIMESTAMP
  285. # sample value: 20:15:14
  286. # encountered in apollo.cdr DWPRDE_VWXP_APOLLO.CDR rtng_event_time field
  287. # so...
  288. # HT = STRING
  289. f_Str = f_Str + "COALESCE(CAST(" + fieldName + " AS VARCHAR(8)),'*NULL*')"
  290. else :
  291. f_Str = f_Str + fld
  292.  
  293. if fieldType <> "TIMESTAMP" and fieldType <> "DATE" :
  294. Sel_Expr = f_Str + ") (TITLE '')"
  295. f_Str = f_Str + ")||'"+FieldDelimiter+"'||"
  296. else:
  297. Sel_Expr = f_Str + " (TITLE '')"
  298. f_Str = f_Str + "||'"+FieldDelimiter+"'||"
  299. ##########################
  300. # End of Build EDW Sel_Expr
  301. ##########################
  302.  
  303. select_fields=select_fields.rstrip().rstrip(',')+'\n'
  304.  
  305. print 'The select_fields for BD CREATE:'
  306. print select_fields
  307. print '\n\n'
  308. print 'The Select expression for EDW export:'
  309. print Sel_Expr
  310. print '\n\n'
  311.  
  312. if len(args.bdtable) > 128:
  313. raise Exception, "HiveTable name cannot be more than 128 characters long: "+args.bdtable
  314.  
  315. ######################$$$$$
  316. # Now run the bteq export
  317. ######################$$$$$
  318. print 'bteq export started of',args.edwdb+'.'+args.edwtable,'to bdtable:',args.bdtable
  319. outfile= "e2h_"+args.bdtable+'_'+ str(datetime.datetime.now()).replace(' ','').replace(':','')
  320.  
  321. where_c=''
  322. if args.where:
  323. where_c='\nWHERE '+args.where
  324.  
  325. cmdLine = """ bteq << EOF
  326. .SESSIONS 4
  327. .SET ERROROUT STDOUT;
  328. """ + LogMech + ".LOGON " + Teradata_Server + "/"+args.user+","+ args.password+ """;
  329. .SET WIDTH 9999
  330. .EXPORT RESET;
  331. .EXPORT REPORT FILE = """ + e2hFolder + outfile + """
  332.  
  333.  
  334. /*Setting format of output file*/
  335. .SET RECORDMODE OFF;
  336. .SET FORMAT OFF ;
  337. .SET TITLEDASHES OFF;
  338. .SET SEPARATOR ',';
  339. SELECT """ + Sel_Expr + " FROM " + args.edwdb+"."+args.edwtable + where_c+ """;
  340. .EXPORT RESET
  341. .quit
  342. EOF"""
  343. retries = 0
  344. while True:
  345. o,retval = run_cmd(cmdLine)
  346.  
  347. if '*** Error: Logon failed!' in o:
  348. # retry because this has been observed to fail even when login is correct.
  349. if retries < 5:
  350. retries += 1
  351. time.sleep(300)
  352. continue
  353. else:
  354. raise Exception, "Bteq Logon failed on export table: "+args.edwdb+"."+args.edwtable+"@" + Teradata_Server
  355.  
  356. if retval != 0 and not okBteqErr(retval, o):
  357. print o
  358. raise Exception, "Non-zero exit code on performing teradata export table."
  359. break
  360.  
  361. ######################$$$$$
  362. # End of run the bteq export
  363. ######################$$$$$
  364.  
  365. cmdLine = "/sbin/fuser " + e2hFolder + outfile
  366.  
  367. o,retval = run_cmd(cmdLine)
  368. print 'fuser result:',o
  369.  
  370. ############ check bteq export
  371. if retval == 1: # 1 returned when there is no process
  372. print 'No process running. Good.'
  373.  
  374. elif retval != 0:
  375. print o
  376. raise Exception, "Non-zero exit code on performing fuser. Please review output."
  377.  
  378. else:
  379. q = o.strip().split('\n')[-1]
  380. r = q.split(None)
  381. pid = None
  382.  
  383. if len(r) == 0 or not r[-1].isdigit():
  384. # handle the case where the pid is in front
  385. r = q.replace(e2hFolder+ outfile+':', '').split(None)
  386. if len(r) == 0 or not r[-1].isdigit():
  387. pid=None
  388. else:
  389. pid = int(r[-1])
  390.  
  391.  
  392.  
  393. cmdLine = "pgrep bteq"
  394.  
  395. while pid:
  396.  
  397. if retval == 1: # 1 returned when there is no match pgrep
  398. break
  399.  
  400. if retval != 0:
  401. print 'output:',o
  402. raise Exception, "Non-zero exit code on performing pgrep. Please review output"
  403. break
  404.  
  405. q = o.strip().split('\n')
  406. r = []
  407. if len(q)>0:
  408. r = [ int(x.strip()) for x in q if x.strip().isdigit() ]
  409.  
  410. if len(r) == 0:
  411. break
  412.  
  413. if pid in r:
  414. # verified there is a running bteq
  415. time.sleep(300) # wait 5 minutes
  416. continue
  417.  
  418. if notify_email:
  419. alert('bteq check out finished',args.edwtable)
  420.  
  421. break
  422.  
  423.  
  424. ############ end of check bteq export
  425.  
  426. print 'Finished bteq export to '+outfile
  427.  
  428. cmdLine = ". ~/.bashrc; kinit -kt ~/user.kt -p "+hiveuser+"; hdfs dfs -put "+ e2hFolder + outfile
  429.  
  430. o,retval = run_cmd(cmdLine)
  431.  
  432. if retval != 0 :
  433. print o
  434. raise Exception, "Non-zero exit code on dfs put "+outfile
  435.  
  436. print 'Finished dfs put '+outfile
  437.  
  438. ############ check dfs put
  439.  
  440.  
  441. cmdLine = "/sbin/fuser " + e2hFolder + outfile
  442. if retval == 1: # 1 returned when there is no process
  443. ls_check(outfile)
  444. # we are done
  445.  
  446. if retval != 0:
  447. print 'output:',o
  448. raise Exception, "Non-zero exit code on performing fuser. Please review output."
  449.  
  450.  
  451. q = o.strip().split('\n')[-1]
  452. r = q.split(None)
  453. pid = None
  454. if len(r) == 0 or not r[-1].isdigit():
  455. # handle the case where the pid is in front
  456. r = q.replace(e2hFolder+ outfile+':', '').split(None)
  457. if len(r) == 0 or not r[-1].isdigit():
  458. ls_check(outfile)
  459. pid = None
  460. else:
  461. pid = int(r[-1])
  462.  
  463.  
  464. cmdLine = "pgrep java | grep " + str(pid)
  465. while pid:
  466.  
  467. if retval == 1: # 1 is returned when pid not found by grep
  468. ls_check(outfile)
  469. break
  470.  
  471. if retval != 0:
  472. print 'output:',o
  473. raise Exception, "Non-zero exit code on performing pgrep. Please review output of dfs put."
  474.  
  475. q = o.strip().split('\n')
  476. if len(q)>0:
  477. while not q[0].strip.isdigit():
  478. q = q[1:]
  479. r = [int(i.strip() ) for i in q]
  480.  
  481. if len(r) == 0:
  482. ls_check(outfile)
  483. break
  484.  
  485. if pid in r:
  486. # verified there is a running bteq/dfs put
  487. time.sleep(300) # wait 5 minutes
  488. continue
  489. ls_check(outfile)
  490. break
  491.  
  492. ############ end of check dfs put
  493.  
  494. cmdLine = 'bin/beeline.sh < /tmp/'+outfile
  495.  
  496. BD_DDL = """
  497. use datalab_ra;
  498. DROP TABLE IF EXISTS """+args.bdtable+"""_temp PURGE ;
  499. CREATE TABLE """+args.bdtable+"""_temp (
  500. """+ select_fields+ """
  501. )
  502. ROW FORMAT DELIMITED
  503. FIELDS TERMINATED BY '""" +FieldDelimiter+ """'
  504. --ESCAPED BY '\\'
  505. NULL DEFINED AS '*NULL*'
  506. STORED AS TEXTFILE ;
  507. LOAD DATA INPATH '/user/"""+ hiveuser+'/'+ outfile +"""'
  508. INTO TABLE """ + args.bdtable +"""_temp;
  509.  
  510. DROP TABLE IF EXISTS """+args.bdtable+""" PURGE ;
  511. CREATE TABLE """+args.bdtable+""" AS
  512. SELECT * FROM """+args.bdtable+"""_temp ;
  513. DROP TABLE IF EXISTS """+args.bdtable+"""_temp PURGE ;
  514. """
  515.  
  516. if multidelimit:
  517. # 2017-10-06 I thought I might now use a new clever multidelimiter. But .java file is not avaialble on all nodes in BD cluster.
  518. BD_DDL="""
  519. use datalab_ra;
  520. DROP TABLE IF EXISTS """+args.bdtable+"""_temp PURGE ;
  521.  
  522. CREATE TABLE """+args.bdtable+"""_temp (
  523. """+ select_fields+ '''
  524. )
  525. ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.MultiDelimitSerDe'
  526. WITH SERDEPROPERTIES ("field.delim"="'''+FieldDelimiter+ '''",'serialization.null.format'='*NULL*')
  527. STORED AS TEXTFILE;
  528.  
  529. LOAD DATA INPATH '/user/'''+ hiveuser+'/'+ outfile +"""'
  530. INTO TABLE """ + args.bdtable +"""_temp;
  531.  
  532. DROP TABLE IF EXISTS """+args.bdtable+""" PURGE ;
  533. CREATE TABLE """+args.bdtable+""" AS
  534. SELECT * FROM """+args.bdtable+"""_temp ;
  535. DROP TABLE IF EXISTS """+args.bdtable+"""_temp PURGE ;
  536. """
  537.  
  538. with open('/tmp/'+outfile,'w') as f:
  539. f.write(BD_DDL)
  540.  
  541. o,retval = run_cmd(cmdLine)
  542.  
  543. if 'Error' in o or 'FAILED' in o or retval != 0:
  544. print cmdLine
  545. print
  546. print o
  547. raise Exception, "Non-zero exit code on CREATE TABLE "+outfile
  548.  
  549. print 'Done with EDW export of',args.edwdb+'.'+args.edwtable,'to bdtable:',args.bdtable
  550. print 'Final output:',o
Add Comment
Please, Sign In to add comment