Guest User

Untitled

a guest
Nov 18th, 2017
133
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.35 KB | None | 0 0
  1. #!/usr/bin/env python
  2. import sys
  3. from pyspark import SparkContext, SparkConf
  4. from pyspark.sql import HiveContext
  5. conf = SparkConf()
  6. sc = SparkContext(conf=conf)
  7. sqlContext = HiveContext(sc)
  8.  
  9. #Condition to specify exact number of arguments in the spark-submit command line
  10. if len(sys.argv) != 8:
  11. print "Invalid number of args......"
  12. print "Usage: spark-submit import.py Arguments"
  13. exit()
  14. args_file = sys.argv[1]
  15. hivedb = sys.argv[2]
  16. domain = sys.argv[3]
  17. port=sys.argv[4]
  18. mysqldb=sys.argv[5]
  19. username=sys.argv[6]
  20. password=sys.argv[7]
  21.  
  22. def mysql_spark(table, hivedb, domain, port, mysqldb, username, password):
  23.  
  24. print "*********************************************************table = {} ***************************".format(table)
  25.  
  26. df = sqlContext.read.format("jdbc").option("url", "{}:{}/{}".format(domain,port,mysqldb)).option("driver", "com.mysql.jdbc.Driver").option("dbtable","{}".format(table)).option("user", "{}".format(username)).option("password", "{}".format(password)).load()
  27.  
  28. df.registerTempTable("mytempTable")
  29.  
  30. sqlContext.sql("create table {}.{} stored as parquet as select * from mytempTable".format(hivedb,table))
  31.  
  32. # file that contains table names
  33. input = sc.textFile('/user/XXXXXXXX/mysql_spark/%s' %args_file).collect()
  34.  
  35. for table in input:
  36. mysql_spark(table, hivedb, domain, port, mysqldb, username, password)
  37.  
  38. sc.stop()
Add Comment
Please, Sign In to add comment