Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- import sys
- from pyspark import SparkContext, SparkConf
- from pyspark.sql import HiveContext
- conf = SparkConf()
- sc = SparkContext(conf=conf)
- sqlContext = HiveContext(sc)
- #Condition to specify exact number of arguments in the spark-submit command line
- if len(sys.argv) != 8:
- print "Invalid number of args......"
- print "Usage: spark-submit import.py Arguments"
- exit()
- table = sys.argv[1]
- hivedb = sys.argv[2]
- domain = sys.argv[3]
- port=sys.argv[4]
- mysqldb=sys.argv[5]
- username=sys.argv[6]
- password=sys.argv[7]
- df = sqlContext.read.format("jdbc").option("url", "{}:{}/{}".format(domain,port,mysqldb)).option("driver", "com.mysql.jdbc.Driver").option("dbtable","{}".format(table)).option("user", "{}".format(username)).option("password", "{}".format(password)).load()
- #Register dataframe as table
- df.registerTempTable("mytempTable")
- # create hive table from temp table:
- sqlContext.sql("create table {}.{} as select * from mytempTable".format(hivedb,table))
- sc.stop()
- #!/bin/bash
- source /home/$USER/spark/source.sh
- [ $# -ne 1 ] && { echo "Usage : $0 table ";exit 1; }
- args_file=$1
- TIMESTAMP=`date "+%Y-%m-%d"`
- touch /home/$USER/logs/${TIMESTAMP}.success_log
- touch /home/$USER/logs/${TIMESTAMP}.fail_log
- success_logs=/home/$USER/logs/${TIMESTAMP}.success_log
- failed_logs=/home/$USER/logs/${TIMESTAMP}.fail_log
- #Function to get the status of the job creation
- function log_status
- {
- status=$1
- message=$2
- if [ "$status" -ne 0 ]; then
- echo "`date +"%Y-%m-%d %H:%M:%S"` [ERROR] $message [Status] $status : failed" | tee -a "${failed_logs}"
- #echo "Please find the attached log file for more details"
- exit 1
- else
- echo "`date +"%Y-%m-%d %H:%M:%S"` [INFO] $message [Status] $status : success" | tee -a "${success_logs}"
- fi
- }
- while read -r table ;do
- spark-submit --name "${table}" --master "yarn-client" --num-executors 2 --executor-memory 6g --executor-cores 1 --conf "spark.yarn.executor.memoryOverhead=609" /home/$USER/spark/sql_spark.py ${table} ${hivedb} ${domain} ${port} ${mysqldb} ${username} ${password} > /tmp/logging/${table}.log 2>&1
- g_STATUS=$?
- log_status $g_STATUS "Spark job ${table} Execution"
- done < "${args_file}"
- echo "************************************************************************************************************************************************************************"
- #!/usr/bin/env python
- import sys
- from pyspark import SparkContext, SparkConf
- from pyspark.sql import HiveContext
- conf = SparkConf()
- sc = SparkContext(conf=conf)
- sqlContext = HiveContext(sc)
- #Condition to specify exact number of arguments in the spark-submit command line
- if len(sys.argv) != 8:
- print "Invalid number of args......"
- print "Usage: spark-submit import.py Arguments"
- exit()
- args_file = sys.argv[1]
- hivedb = sys.argv[2]
- domain = sys.argv[3]
- port=sys.argv[4]
- mysqldb=sys.argv[5]
- username=sys.argv[6]
- password=sys.argv[7]
- def testing(table, hivedb, domain, port, mysqldb, username, password):
- print "*********************************************************table = {} ***************************".format(table)
- df = sqlContext.read.format("jdbc").option("url", "{}:{}/{}".format(domain,port,mysqldb)).option("driver", "com.mysql.jdbc.Driver").option("dbtable","{}".format(table)).option("user", "{}".format(username)).option("password", "{}".format(password)).load()
- #Register dataframe as table
- df.registerTempTable("mytempTable")
- # create hive table from temp table:
- sqlContext.sql("create table {}.{} stored as parquet as select * from mytempTable".format(hivedb,table))
- input = sc.textFile('/user/XXXXXXX/spark_args/%s' %args_file).collect()
- for table in input:
- testing(table, hivedb, domain, port, mysqldb, username, password)
- sc.stop()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement