Advertisement
Guest User

Untitled

a guest
Aug 14th, 2017
66
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.78 KB | None | 0 0
  1. #!/usr/bin/env python
  2. import sys
  3. from pyspark import SparkContext, SparkConf
  4. from pyspark.sql import HiveContext
  5. conf = SparkConf()
  6. sc = SparkContext(conf=conf)
  7. sqlContext = HiveContext(sc)
  8.  
  9. #Condition to specify exact number of arguments in the spark-submit command line
  10. if len(sys.argv) != 8:
  11. print "Invalid number of args......"
  12. print "Usage: spark-submit import.py Arguments"
  13. exit()
  14. table = sys.argv[1]
  15. hivedb = sys.argv[2]
  16. domain = sys.argv[3]
  17. port=sys.argv[4]
  18. mysqldb=sys.argv[5]
  19. username=sys.argv[6]
  20. password=sys.argv[7]
  21.  
  22. df = sqlContext.read.format("jdbc").option("url", "{}:{}/{}".format(domain,port,mysqldb)).option("driver", "com.mysql.jdbc.Driver").option("dbtable","{}".format(table)).option("user", "{}".format(username)).option("password", "{}".format(password)).load()
  23.  
  24. #Register dataframe as table
  25. df.registerTempTable("mytempTable")
  26.  
  27. # create hive table from temp table:
  28. sqlContext.sql("create table {}.{} as select * from mytempTable".format(hivedb,table))
  29.  
  30. sc.stop()
  31.  
  32. #!/bin/bash
  33.  
  34. source /home/$USER/spark/source.sh
  35. [ $# -ne 1 ] && { echo "Usage : $0 table ";exit 1; }
  36.  
  37. args_file=$1
  38.  
  39. TIMESTAMP=`date "+%Y-%m-%d"`
  40. touch /home/$USER/logs/${TIMESTAMP}.success_log
  41. touch /home/$USER/logs/${TIMESTAMP}.fail_log
  42. success_logs=/home/$USER/logs/${TIMESTAMP}.success_log
  43. failed_logs=/home/$USER/logs/${TIMESTAMP}.fail_log
  44.  
  45. #Function to get the status of the job creation
  46. function log_status
  47. {
  48. status=$1
  49. message=$2
  50. if [ "$status" -ne 0 ]; then
  51. echo "`date +"%Y-%m-%d %H:%M:%S"` [ERROR] $message [Status] $status : failed" | tee -a "${failed_logs}"
  52. #echo "Please find the attached log file for more details"
  53. exit 1
  54. else
  55. echo "`date +"%Y-%m-%d %H:%M:%S"` [INFO] $message [Status] $status : success" | tee -a "${success_logs}"
  56. fi
  57. }
  58. while read -r table ;do
  59. spark-submit --name "${table}" --master "yarn-client" --num-executors 2 --executor-memory 6g --executor-cores 1 --conf "spark.yarn.executor.memoryOverhead=609" /home/$USER/spark/sql_spark.py ${table} ${hivedb} ${domain} ${port} ${mysqldb} ${username} ${password} > /tmp/logging/${table}.log 2>&1
  60. g_STATUS=$?
  61. log_status $g_STATUS "Spark job ${table} Execution"
  62. done < "${args_file}"
  63.  
  64. echo "************************************************************************************************************************************************************************"
  65.  
  66. #!/usr/bin/env python
  67. import sys
  68. from pyspark import SparkContext, SparkConf
  69. from pyspark.sql import HiveContext
  70. conf = SparkConf()
  71. sc = SparkContext(conf=conf)
  72. sqlContext = HiveContext(sc)
  73.  
  74. #Condition to specify exact number of arguments in the spark-submit command line
  75. if len(sys.argv) != 8:
  76. print "Invalid number of args......"
  77. print "Usage: spark-submit import.py Arguments"
  78. exit()
  79. args_file = sys.argv[1]
  80. hivedb = sys.argv[2]
  81. domain = sys.argv[3]
  82. port=sys.argv[4]
  83. mysqldb=sys.argv[5]
  84. username=sys.argv[6]
  85. password=sys.argv[7]
  86.  
  87. def testing(table, hivedb, domain, port, mysqldb, username, password):
  88.  
  89. print "*********************************************************table = {} ***************************".format(table)
  90. df = sqlContext.read.format("jdbc").option("url", "{}:{}/{}".format(domain,port,mysqldb)).option("driver", "com.mysql.jdbc.Driver").option("dbtable","{}".format(table)).option("user", "{}".format(username)).option("password", "{}".format(password)).load()
  91.  
  92. #Register dataframe as table
  93. df.registerTempTable("mytempTable")
  94.  
  95. # create hive table from temp table:
  96. sqlContext.sql("create table {}.{} stored as parquet as select * from mytempTable".format(hivedb,table))
  97.  
  98. input = sc.textFile('/user/XXXXXXX/spark_args/%s' %args_file).collect()
  99.  
  100. for table in input:
  101. testing(table, hivedb, domain, port, mysqldb, username, password)
  102.  
  103. sc.stop()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement