Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #show raw data is coming through
- #display(parsedData)
- contents = parsedData
- columns = [
- # {'path': './SchemeResult/PolData/ProposerPolicyholder/ProposerPolicyholder_Type', 'type': str},
- # {'path': './SchemeResult/PolData/ProposerPolicyholder/ProposerPolicyholder_NoOfVehiclesAvailableToFamily', 'type': int},
- {'path': './SchemeResult/PolData/Driver/Driver_Title', 'type': str},
- {'path': './SchemeResult/PolData/Driver/Driver_ForenameInitial1', 'type': str},
- {'path': './SchemeResult/PolData/Driver/Driver_Surname', 'type': str},
- {'path': './SchemeResult/PolData/Driver/Driver_RelationshipToProposer', 'type': str},
- {'path': './SchemeResult/PolData/Driver/Driver_Sex', 'type': str},
- {'path': './SchemeResult/PolData/Driver/Driver_ClassOfUse', 'type': str},
- {'path': './SchemeResult/PolData/Driver/Driver_ClaimsInd', 'type': str},
- {'path': './SchemeResult/PolData/Driver/Driver_ConvictionsInd', 'type': str},
- {'path': './SchemeResult/PolData/Driver/Occupation/Occupation_Code', 'type': str},
- {'path': './SchemeResult/PolData/Vehicle/Ncd/Ncd_ClaimedYears', 'type': str},
- {'path': './SchemeResult/PolData/Vehicle/Ncd/Ncd_GrantedYears', 'type': str},
- {'path': './SchemeResult/PolData/ProposerPolicyholder/ProposerPolicyholder_DateOfBirth', 'type': str},
- {'path': './SchemeResult/PolData/ProposerPolicyholder/ProposerPolicyholder_AddressLine1', 'type': str},
- {'path': './SchemeResult/PolData/ProposerPolicyholder/ProposerPolicyholder_AddressLine2', 'type': str},
- {'path': './SchemeResult/PolData/ProposerPolicyholder/ProposerPolicyholder_PostCodeFull', 'type': str},
- {'path': './SchemeResult/PolData/CalculatedResult/PremiumOptions/PremiumOptions_PremiumAmountExclusiveOfIpt', 'type': str},
- {'path': './SchemeResult/PolData/CalculatedResult/PremiumOptions/PremiumOptions_PremiumAmountInclusiveOfIpt', 'type': str},
- {'path': './SchemeResult/PolData/Intermediary/Intermediary_Code', 'type': str},
- {'path': './SchemeResult/PolData/ProcessingIndicators/ProcessingIndicators_ProcessType', 'type': str},
- # {'path': './SchemeResult/PolData/ProposerPolicyholder/ProposerPolicyholder_PostCodeSector', 'type': str},
- # {'path': './SchemeResult/PolData/ProposerPolicyholder/ProposerPolicyholder_NoAddressLines', 'type': int},
- # {'path': './SchemeResult/PolData/Vehicle/Vehicle_NoDrivers', 'type': int},
- {'path': './SchemeResult/PolData/Vehicle/Vehicle_Model', 'type': str},
- {'path': './SchemeResult/PolData/Vehicle/Vehicle_LocationKeptOvernight', 'type': str},
- {'path': './SchemeResult/PolData/Vehicle/Vehicle_RegNo', 'type': str}
- ]
- #import xml tree library
- import xml.etree.ElementTree as ET
- #define our function, which takes xml as an input. This function splits on cdata and gets rid of / characters. Also added some error handling in case the xml fields are not found.
- def data_from_xml(xml):
- data = (xml.split('<![CDATA[')[1].split(']]>')[0]).replace('\"','"')
- tree = ET.fromstring(data)
- res = []
- for c in columns:
- try:
- res.append(c['type'](tree.findall(c['path'])[0].attrib['Val']))
- except:
- res.append(None)
- return res
- #user defined function. allows you to turn a python function into something that can be used in databricks.
- from pyspark.sql.functions import udf
- from pyspark.sql.types import *
- #map our data types to a dictionary
- type_map = {
- int : IntegerType,
- str : StringType,
- float: FloatType,
- }
- #define our schema
- schema = StructType([ StructField(c['path'].split('/')[-1].lower(), type_map[c['type']](), False) for c in columns])
- #new udf version of data_from_xml, which uses the schema to define the result columns
- xml_udf = udf(data_from_xml, schema)
- processed = contents.select(xml_udf("message").alias("message_data")).select("message_data.*")
- #display(processed)
- #need to create a column called QuoteID (guid/uniqueidentifier) and add to the processed df
- import uuid
- from pyspark.sql.functions import udf
- uuidUdf = udf(lambda : str(uuid.uuid4()),StringType())
- processedid = processed.withColumn("quoteid", uuidUdf())
- #display(processedid)
- #create hash values in additional columns within the processed df
- from pyspark.sql.functions import sha1, concat_ws, col
- hasheddf = processedid.withColumn('driverhashkey',trim(sha1(concat_ws("||", col("Driver_ForenameInitial1"),col("Driver_Surname"),col("ProposerPolicyholder_DateOfBirth")))))
- .withColumn('vehiclehashkey',trim(sha1(concat_ws("||", col("Vehicle_Model"),col("Vehicle_RegNo")))))
- .withColumn('quotehashkey',trim(sha1(concat_ws("||", col("Driver_ForenameInitial1"),col("Driver_Surname"),col("ProposerPolicyholder_DateOfBirth"),col("Vehicle_Model"),col("Vehicle_RegNo"),col("QuoteID")))))
- .withColumn('driverhashdiffslow',trim(sha1(concat_ws("||", col("Driver_ForenameInitial1"),col("Driver_Surname"),col("ProposerPolicyholder_DateOfBirth"),col("Driver_Title"),col("Driver_RelationshipToProposer"),col("Driver_Sex"),col("Occupation_Code")))))
- .withColumn('driverhashdifffast',trim(sha1(concat_ws("||", col("Driver_ForenameInitial1"),col("Driver_Surname"),col("ProposerPolicyholder_DateOfBirth"),col("Driver_ClassOfUse"),col("Ncd_ClaimedYears"),col("Ncd_GrantedYears"),col("Driver_ClaimsInd"),col("Driver_ConvictionsInd")))))
- .withColumn('vehiclehashdiff',trim(sha1(concat_ws("||", col("Vehicle_Model"),col("Vehicle_RegNo"),col("ProposerPolicyHolder_AddressLine1"),col("ProposerPolicyHolder_AddressLine2"),col("ProposerPolicyholder_PostCodeFull"),col("Vehicle_LocationKeptOvernight")))))
- #connect to sql db and populate table
- user = dbutils.secrets.get(scope = "jdbcconnect", key = "username")
- password = dbutils.secrets.get(scope = "jdbcconnect", key = "password")
- connectionProperties ={"driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"}
- jdbcUrl = "jdbc:sqlserver://ageas-ihpcloud-poc-sqlserver.database.windows.net:1433;database=ageas-ihpcloud-poc-sqldb;user={0};password={1}".format(user, password)
- def writeToSQL(hasheddf, epochId):
- hasheddf.write.mode('append').jdbc(url=jdbcUrl, table="Raw.PC_Driver_Quote_Vehicle", properties=connectionProperties).save()
Add Comment
Please, Sign In to add comment