Advertisement
Guest User

Untitled

a guest
Apr 11th, 2017
63
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.10 KB | None | 0 0
  1. val jdbcUrl = "jdbc:sqlserver://SERVER_IP;user=USERNAME;password=PASSWORD"
  2. val maxId = sqlContext.jdbc(jdbcUrl, "(select max(Person_Number) as maxId from [Export].[dbo].[Person]) as tmp").select("maxId").collect()(0)(0).toString
  3. val dbOpts = Map(
  4. "url" -> jdbcUrl,
  5. "dbtable" -> "[Export].[dbo].[Person]",
  6. "partitionColumn" -> "Person_Number",
  7. "numPartitions" -> "4",
  8. "lowerBound" -> "0",
  9. "upperBound" -> maxId,
  10. "fetchSize" -> "1000"
  11. )
  12. var jdbcDF = sqlContext.read.format("jdbc").options(dbOpts).load
  13. jdbcDF.registerTempTable("Person")
  14. val maxId2 = sqlContext.jdbc(jdbcUrl, "(select max(Person_Number) as maxId from [Export].[dbo].[Person_Telephone]) as tmp").select("maxId").collect()(0)(0).toString
  15. val dbOpts2 = Map(
  16. "url" -> jdbcUrl,
  17. "dbtable" -> "[Export].[dbo].[Person_Telephone]",
  18. "partitionColumn" -> "Person_Number",
  19. "numPartitions" -> "4",
  20. "lowerBound" -> "0",
  21. "upperBound" -> maxId2,
  22. "fetchSize" -> "1000"
  23. )
  24. var jdbcDF2 = sqlContext.read.format("jdbc").options(dbOpts2).load
  25. val maxId3 = sqlContext.jdbc(jdbcUrl, "(select max(Person_Number) as maxId from [Export].[dbo].[Person_Resume]) as tmp").select("maxId").collect()(0)(0).toString
  26. val dbOpts3 = Map(
  27. "url" -> jdbcUrl,
  28. "dbtable" -> "[Export].[dbo].[Person_Resume]",
  29. "partitionColumn" -> "Person_Number",
  30. "numPartitions" -> "10",
  31. "lowerBound" -> "0",
  32. "upperBound" -> maxId3,
  33. "fetchSize" -> "100"
  34. )
  35. var jdbcDF3 = sqlContext.read.format("jdbc").options(dbOpts3).load
  36. var joinDF = jdbcDF.join(jdbcDF2.select("Person_Number", "Telephone_Number"), Seq("Person_Number"), "left_outer")
  37. joinDF = joinDF.join(jdbcDF3.select("Person_Number", "Resume"), Seq("Person_Number"), "left_outer")
  38. var joinDFPhone = joinDF.groupBy("Person_Number").agg(collect_set("Telephone_Number"))
  39. var joinDFResume = joinDF.groupBy("Person_Number").agg(collect_set("Resume"))
  40. var res = joinDFPhone.join(joinDFResume, Seq("Person_Number"), "left")
  41. res.withColumnRenamed("collect_set(Telephone_Number)", "Telephone_Number_ss").withColumnRenamed("Person_Number", "id").withColumnRenamed("collect_set(Resume)", "Resume_txt").write.format("solr").option("collection", "default").save
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement