Advertisement
Guest User

Untitled

a guest
May 12th, 2017
70
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.88 KB | None | 0 0
  1. import os
  2. import sys
  3. from pyspark import SparkContext
  4. from pyspark import SparkConf
  5. from pyspark.sql import SQLContext
  6. from pyspark.sql import SparkSession
  7. from pyspark.sql import DataFrameReader
  8.  
  9. conf = SparkConf().setAppName('Simple App')
  10. sc = SparkContext("local", "Simple App")
  11. spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
  12. sqlContext = SQLContext(sc)
  13.  
  14. # Path for spark source folder
  15. os.environ['SPARK_HOME']="C:/Users/USER1/rcs/spark-2.1.0-bin-hadoop2.6"
  16. os.environ['SPARK_CLASSPATH']="C:/Users/USER1/Documents/python/test/100_script_30_day_challenge/pyspark/postgresql-42.1.1.jre6.jar"
  17. # Append pyspark to Python Path
  18. sys.path.append("C:/Users/USER1/rcs/spark-2.1.0-bin-hadoop2.6/python")
  19. sys.path.append("C:/Users/USER1/rcs/spark-2.1.0-bin-hadoop2.6/python/lib/py4j-0.10.4-src.zip")
  20.  
  21.  
  22. spark = SparkSession.builder\
  23. .master('local[*]')\
  24. .appName('My App')\
  25. .config('spark.sql.warehouse.dir', 'file:///C:/temp')\
  26. .getOrCreate()
  27.  
  28.  
  29. accounts_rdd = spark.read\
  30. .format('csv')\
  31. .option('header', 'true')\
  32. .load('test_bank_dat.csv')
  33.  
  34. #Convert RDD to DataFrame
  35. cols = ('ACCOUNT_ID','STREET_ADDRESS','SECONDARY_ADDRESS','POSTAL_CODE','CITY','COUNTRY','COUNTRY_CODE',
  36. 'ZIP_CODE','SWIFT_ADDR','TEL_NUM','EMAIL_ADDR','CNTCT_PRSN','CMPNY_NAME','FAX_NUM')
  37. print accounts_rdd.show()
  38.  
  39. df = accounts_rdd.toDF(*cols)
  40. print df.show()
  41.  
  42. # Define JDBC properties for DB Connection
  43. url = "jdbc:postgresql://localhost/postgres"
  44. properties = {
  45. "user": "pridash4",
  46. "driver": "org.postgresql.Driver"
  47. }
  48.  
  49.  
  50. #Write the file to DataBase table test_bics
  51. #df.write.mode("overwrite").jdbc(url=url, table="test_bics1", properties=properties)
  52. val1 = df.count()
  53. print "count:",val1
  54.  
  55. df = DataFrameReader(sqlContext).jdbc(
  56. url=url, table='test_bics1', properties=properties
  57. )
  58. val2 = df.count()
  59. print val2
  60. if val1 == val2:
  61. print "All recourds uploaded"
  62. else:
  63. print "Record mismatch"
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement