Advertisement
Guest User

Untitled

a guest
May 12th, 2017
71
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.26 KB | None | 0 0
  1. import os
  2. import sys
  3. from pyspark import SparkContext
  4. from pyspark import SparkConf
  5. from pyspark.sql import SQLContext
  6. from pyspark.sql import SparkSession
  7. from pyspark.sql import DataFrameReader
  8. from pyspark.sql.types import StringType
  9. from pyspark.sql.functions import udf
  10.  
  11.  
  12. #Functions to mask the data columns based on Account ID or SWIFT BIC
  13. def update_STREET_ADDRESS(ACCOUNT_ID):
  14. return "Street Address for "+ACCOUNT_ID
  15.  
  16.  
  17. def update_SECONDARY_ADDRESS(ACCOUNT_ID):
  18. return "Secondary Address for "+ACCOUNT_ID
  19.  
  20. def update_POSTAL_CODE(ACCOUNT_ID):
  21. return "Postal Code for "+ACCOUNT_ID
  22.  
  23. def update_CITY(ACCOUNT_ID):
  24. return "City for "+ACCOUNT_ID
  25.  
  26. def update_ZIP_CODE(ACCOUNT_ID):
  27. return "Zip Code for "+ACCOUNT_ID
  28.  
  29. def update_SWIFT_ADDR(SWIFT_ADDR):
  30. return SWIFT_ADDR[:-2]+"XXXX"
  31.  
  32. def update_TEL_NUM(ACCOUNT_ID):
  33. return "Tel No for "+ACCOUNT_ID
  34.  
  35. def update_EMAIL_ADDR(ACCOUNT_ID):
  36. return "Email ID for "+ACCOUNT_ID
  37.  
  38. def update_CNTCT_PRSN(ACCOUNT_ID):
  39. return "Contact Person for "+ACCOUNT_ID
  40.  
  41. def update_CMPNY_NAME(ACCOUNT_ID):
  42. return "Company Name "+ACCOUNT_ID
  43.  
  44. def update_FAX_NUM(ACCOUNT_ID):
  45. return "Fax Num "+ACCOUNT_ID
  46.  
  47. #Create Spark Context & Session Object
  48. conf = SparkConf().setAppName('Simple App')
  49. sc = SparkContext("local", "Simple App")
  50. spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
  51. sqlContext = SQLContext(sc)
  52.  
  53. # Path for spark source folder
  54. os.environ['SPARK_HOME']="C:/Users/USER1/rcs/spark-2.1.0-bin-hadoop2.6"
  55. os.environ['SPARK_CLASSPATH']="C:/Users/USER1/Documents/python/test/100_script_30_day_challenge/pyspark/postgresql-42.1.1.jre6.jar"
  56. # Append pyspark to Python Path
  57. sys.path.append("C:/Users/USER1/rcs/spark-2.1.0-bin-hadoop2.6/python")
  58. sys.path.append("C:/Users/USER1/rcs/spark-2.1.0-bin-hadoop2.6/python/lib/py4j-0.10.4-src.zip")
  59.  
  60.  
  61. spark = SparkSession.builder\
  62. .master('local[*]')\
  63. .appName('My App')\
  64. .config('spark.sql.warehouse.dir', 'file:///C:/temp')\
  65. .getOrCreate()
  66.  
  67.  
  68.  
  69. #Convert RDD to DataFrame
  70. cols = ('ACCOUNT_ID','STREET_ADDRESS','SECONDARY_ADDRESS','POSTAL_CODE','CITY','COUNTRY','COUNTRY_CODE',
  71. 'ZIP_CODE','SWIFT_ADDR','TEL_NUM','EMAIL_ADDR','CNTCT_PRSN','CMPNY_NAME','FAX_NUM')
  72.  
  73.  
  74.  
  75. # Define JDBC properties for DB Connection
  76. url = "jdbc:postgresql://localhost/postgres"
  77. properties = {
  78. "user": "pridash4",
  79. "driver": "org.postgresql.Driver"
  80. }
  81.  
  82.  
  83. #Read the BIC & Account Data from DB
  84. df = DataFrameReader(sqlContext).jdbc(
  85. url=url, table='test_bics1', properties=properties
  86. )
  87.  
  88. val1 = df.count()
  89. print val1
  90.  
  91. df.registerTempTable("test_bics1")
  92.  
  93. #Mask the Data Colums
  94. sqlContext.udf.register("update_STREET_ADDRESS_udf",update_STREET_ADDRESS,StringType())
  95. sqlContext.udf.register("update_SECONDARY_ADDRESS_udf",update_SECONDARY_ADDRESS,StringType())
  96. sqlContext.udf.register("update_POSTAL_CODE_udf",update_POSTAL_CODE,StringType())
  97. sqlContext.udf.register("update_CITY_udf",update_CITY,StringType())
  98. sqlContext.udf.register("update_ZIP_CODE_udf",update_ZIP_CODE,StringType())
  99. sqlContext.udf.register("update_SWIFT_ADDR_udf",update_SWIFT_ADDR,StringType())
  100. sqlContext.udf.register("update_TEL_NUM_udf",update_TEL_NUM,StringType())
  101. sqlContext.udf.register("update_EMAIL_ADDR_udf",update_EMAIL_ADDR,StringType())
  102. sqlContext.udf.register("update_CNTCT_PRSN_udf",update_CNTCT_PRSN,StringType())
  103. sqlContext.udf.register("update_CMPNY_NAME_udf",update_CMPNY_NAME,StringType())
  104. sqlContext.udf.register("update_FAX_NUM_udf",update_FAX_NUM,StringType())
  105.  
  106.  
  107. df1 = sqlContext.sql("select ACCOUNT_ID,update_STREET_ADDRESS_udf(ACCOUNT_ID) as STREET_ADDRESS,update_SECONDARY_ADDRESS_udf(ACCOUNT_ID) as SECONDARY_ADDRESS,update_POSTAL_CODE_udf(ACCOUNT_ID) as POSTAL_CODE,update_CITY_udf(ACCOUNT_ID) as CITY,COUNTRY,COUNTRY_CODE,update_ZIP_CODE_udf(ACCOUNT_ID) as ZIP_CODE,update_SWIFT_ADDR_udf(SWIFT_ADDR) as SWIFT_ADDR,update_TEL_NUM_udf(ACCOUNT_ID) as TEL_NUM,update_EMAIL_ADDR_udf(ACCOUNT_ID) as EMAIL_ADDR,update_CNTCT_PRSN_udf(ACCOUNT_ID) as CNTCT_PRSN,update_CMPNY_NAME_udf(ACCOUNT_ID) as CMPNY_NAME,update_FAX_NUM_udf(ACCOUNT_ID) as FAX_NUM from test_bics1 limit 100")
  108.  
  109. #Write the file to DataBase table test_bics
  110. df1.write.mode("overwrite").jdbc(url=url, table="test_bics2", properties=properties)
  111.  
  112. val2 = df.count()
  113. print val2
  114.  
  115.  
  116. if val1 == val2:
  117. print "All recourds uploaded"
  118. else:
  119. print "Record mismatch1"
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement