Guest User

Untitled

a guest
Jan 31st, 2019
182
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.87 KB | None | 0 0
  1. #show raw data is coming through
  2. #display(parsedData)
  3. contents = parsedData
  4.  
  5. columns = [
  6. # {'path': './SchemeResult/PolData/ProposerPolicyholder/ProposerPolicyholder_Type', 'type': str},
  7. # {'path': './SchemeResult/PolData/ProposerPolicyholder/ProposerPolicyholder_NoOfVehiclesAvailableToFamily', 'type': int},
  8. {'path': './SchemeResult/PolData/Driver/Driver_Title', 'type': str},
  9. {'path': './SchemeResult/PolData/Driver/Driver_ForenameInitial1', 'type': str},
  10. {'path': './SchemeResult/PolData/Driver/Driver_Surname', 'type': str},
  11. {'path': './SchemeResult/PolData/Driver/Driver_RelationshipToProposer', 'type': str},
  12. {'path': './SchemeResult/PolData/Driver/Driver_Sex', 'type': str},
  13. {'path': './SchemeResult/PolData/Driver/Driver_ClassOfUse', 'type': str},
  14. {'path': './SchemeResult/PolData/Driver/Driver_ClaimsInd', 'type': str},
  15. {'path': './SchemeResult/PolData/Driver/Driver_ConvictionsInd', 'type': str},
  16. {'path': './SchemeResult/PolData/Driver/Occupation/Occupation_Code', 'type': str},
  17. {'path': './SchemeResult/PolData/Vehicle/Ncd/Ncd_ClaimedYears', 'type': str},
  18. {'path': './SchemeResult/PolData/Vehicle/Ncd/Ncd_GrantedYears', 'type': str},
  19. {'path': './SchemeResult/PolData/ProposerPolicyholder/ProposerPolicyholder_DateOfBirth', 'type': str},
  20. {'path': './SchemeResult/PolData/ProposerPolicyholder/ProposerPolicyholder_AddressLine1', 'type': str},
  21. {'path': './SchemeResult/PolData/ProposerPolicyholder/ProposerPolicyholder_AddressLine2', 'type': str},
  22. {'path': './SchemeResult/PolData/ProposerPolicyholder/ProposerPolicyholder_PostCodeFull', 'type': str},
  23. {'path': './SchemeResult/PolData/CalculatedResult/PremiumOptions/PremiumOptions_PremiumAmountExclusiveOfIpt', 'type': str},
  24. {'path': './SchemeResult/PolData/CalculatedResult/PremiumOptions/PremiumOptions_PremiumAmountInclusiveOfIpt', 'type': str},
  25. {'path': './SchemeResult/PolData/Intermediary/Intermediary_Code', 'type': str},
  26. {'path': './SchemeResult/PolData/ProcessingIndicators/ProcessingIndicators_ProcessType', 'type': str},
  27. # {'path': './SchemeResult/PolData/ProposerPolicyholder/ProposerPolicyholder_PostCodeSector', 'type': str},
  28. # {'path': './SchemeResult/PolData/ProposerPolicyholder/ProposerPolicyholder_NoAddressLines', 'type': int},
  29. # {'path': './SchemeResult/PolData/Vehicle/Vehicle_NoDrivers', 'type': int},
  30. {'path': './SchemeResult/PolData/Vehicle/Vehicle_Model', 'type': str},
  31. {'path': './SchemeResult/PolData/Vehicle/Vehicle_LocationKeptOvernight', 'type': str},
  32. {'path': './SchemeResult/PolData/Vehicle/Vehicle_RegNo', 'type': str}
  33. ]
  34.  
  35.  
  36. #import xml tree library
  37. import xml.etree.ElementTree as ET
  38.  
  39. #define our function, which takes xml as an input. This function splits on cdata and gets rid of / characters. Also added some error handling in case the xml fields are not found.
  40. def data_from_xml(xml):
  41. data = (xml.split('<![CDATA[')[1].split(']]>')[0]).replace('\"','"')
  42. tree = ET.fromstring(data)
  43. res = []
  44. for c in columns:
  45. try:
  46. res.append(c['type'](tree.findall(c['path'])[0].attrib['Val']))
  47. except:
  48. res.append(None)
  49. return res
  50.  
  51. #user defined function. allows you to turn a python function into something that can be used in databricks.
  52. from pyspark.sql.functions import udf
  53. from pyspark.sql.types import *
  54.  
  55. #map our data types to a dictionary
  56. type_map = {
  57. int : IntegerType,
  58. str : StringType,
  59. float: FloatType,
  60. }
  61.  
  62. #define our schema
  63. schema = StructType([ StructField(c['path'].split('/')[-1].lower(), type_map[c['type']](), False) for c in columns])
  64.  
  65. #new udf version of data_from_xml, which uses the schema to define the result columns
  66. xml_udf = udf(data_from_xml, schema)
  67.  
  68. processed = contents.select(xml_udf("message").alias("message_data")).select("message_data.*")
  69. #display(processed)
  70.  
  71. #need to create a column called QuoteID (guid/uniqueidentifier) and add to the processed df
  72. import uuid
  73. from pyspark.sql.functions import udf
  74.  
  75. uuidUdf = udf(lambda : str(uuid.uuid4()),StringType())
  76. processedid = processed.withColumn("quoteid", uuidUdf())
  77.  
  78. #display(processedid)
  79.  
  80. #create hash values in additional columns within the processed df
  81. from pyspark.sql.functions import sha1, concat_ws, col
  82. hasheddf = processedid.withColumn('driverhashkey',trim(sha1(concat_ws("||", col("Driver_ForenameInitial1"),col("Driver_Surname"),col("ProposerPolicyholder_DateOfBirth")))))
  83. .withColumn('vehiclehashkey',trim(sha1(concat_ws("||", col("Vehicle_Model"),col("Vehicle_RegNo")))))
  84. .withColumn('quotehashkey',trim(sha1(concat_ws("||", col("Driver_ForenameInitial1"),col("Driver_Surname"),col("ProposerPolicyholder_DateOfBirth"),col("Vehicle_Model"),col("Vehicle_RegNo"),col("QuoteID")))))
  85. .withColumn('driverhashdiffslow',trim(sha1(concat_ws("||", col("Driver_ForenameInitial1"),col("Driver_Surname"),col("ProposerPolicyholder_DateOfBirth"),col("Driver_Title"),col("Driver_RelationshipToProposer"),col("Driver_Sex"),col("Occupation_Code")))))
  86. .withColumn('driverhashdifffast',trim(sha1(concat_ws("||", col("Driver_ForenameInitial1"),col("Driver_Surname"),col("ProposerPolicyholder_DateOfBirth"),col("Driver_ClassOfUse"),col("Ncd_ClaimedYears"),col("Ncd_GrantedYears"),col("Driver_ClaimsInd"),col("Driver_ConvictionsInd")))))
  87. .withColumn('vehiclehashdiff',trim(sha1(concat_ws("||", col("Vehicle_Model"),col("Vehicle_RegNo"),col("ProposerPolicyHolder_AddressLine1"),col("ProposerPolicyHolder_AddressLine2"),col("ProposerPolicyholder_PostCodeFull"),col("Vehicle_LocationKeptOvernight")))))
  88.  
  89.  
  90. #connect to sql db and populate table
  91. user = dbutils.secrets.get(scope = "jdbcconnect", key = "username")
  92. password = dbutils.secrets.get(scope = "jdbcconnect", key = "password")
  93.  
  94. connectionProperties ={"driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"}
  95.  
  96. jdbcUrl = "jdbc:sqlserver://ageas-ihpcloud-poc-sqlserver.database.windows.net:1433;database=ageas-ihpcloud-poc-sqldb;user={0};password={1}".format(user, password)
  97.  
  98. def writeToSQL(hasheddf, epochId):
  99. hasheddf.write.mode('append').jdbc(url=jdbcUrl, table="Raw.PC_Driver_Quote_Vehicle", properties=connectionProperties).save()
Add Comment
Please, Sign In to add comment