Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- val jdbcUrl = "jdbc:sqlserver://SERVER_IP;user=USERNAME;password=PASSWORD"
- val maxId = sqlContext.jdbc(jdbcUrl, "(select max(Person_Number) as maxId from [Export].[dbo].[Person]) as tmp").select("maxId").collect()(0)(0).toString
- val dbOpts = Map(
- "url" -> jdbcUrl,
- "dbtable" -> "[Export].[dbo].[Person]",
- "partitionColumn" -> "Person_Number",
- "numPartitions" -> "4",
- "lowerBound" -> "0",
- "upperBound" -> maxId,
- "fetchSize" -> "1000"
- )
- var jdbcDF = sqlContext.read.format("jdbc").options(dbOpts).load
- jdbcDF.registerTempTable("Person")
- val maxId2 = sqlContext.jdbc(jdbcUrl, "(select max(Person_Number) as maxId from [Export].[dbo].[Person_Telephone]) as tmp").select("maxId").collect()(0)(0).toString
- val dbOpts2 = Map(
- "url" -> jdbcUrl,
- "dbtable" -> "[Export].[dbo].[Person_Telephone]",
- "partitionColumn" -> "Person_Number",
- "numPartitions" -> "4",
- "lowerBound" -> "0",
- "upperBound" -> maxId2,
- "fetchSize" -> "1000"
- )
- var jdbcDF2 = sqlContext.read.format("jdbc").options(dbOpts2).load
- val maxId3 = sqlContext.jdbc(jdbcUrl, "(select max(Person_Number) as maxId from [Export].[dbo].[Person_Resume]) as tmp").select("maxId").collect()(0)(0).toString
- val dbOpts3 = Map(
- "url" -> jdbcUrl,
- "dbtable" -> "[Export].[dbo].[Person_Resume]",
- "partitionColumn" -> "Person_Number",
- "numPartitions" -> "10",
- "lowerBound" -> "0",
- "upperBound" -> maxId3,
- "fetchSize" -> "100"
- )
- var jdbcDF3 = sqlContext.read.format("jdbc").options(dbOpts3).load
- var joinDF = jdbcDF.join(jdbcDF2.select("Person_Number", "Telephone_Number"), Seq("Person_Number"), "left_outer")
- joinDF = joinDF.join(jdbcDF3.select("Person_Number", "Resume"), Seq("Person_Number"), "left_outer")
- var joinDFPhone = joinDF.groupBy("Person_Number").agg(collect_set("Telephone_Number"))
- var joinDFResume = joinDF.groupBy("Person_Number").agg(collect_set("Resume"))
- var res = joinDFPhone.join(joinDFResume, Seq("Person_Number"), "left")
- res.withColumnRenamed("collect_set(Telephone_Number)", "Telephone_Number_ss").withColumnRenamed("Person_Number", "id").withColumnRenamed("collect_set(Resume)", "Resume_txt").write.format("solr").option("collection", "default").save
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement