Guest User

Untitled

a guest
Nov 8th, 2017
95
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.15 KB | None | 0 0
  1. case class City(
  2. id: Int,
  3. name: String,
  4. iata: Option[String],
  5. longitude: Option[Double],
  6. latitude: Option[Double],
  7. updatedTimestamp: Long)
  8.  
  9. object udfHelpers {
  10. val toTimestamp = udf((date: String) => new SimpleDateFormat("yyyy-MM-dd HH:mm").parse(date).getTime)
  11. }
  12.  
  13. val spark = SparkSession
  14. .builder
  15. .appName("Spark Test")
  16. .master("local[*]")
  17. .getOrCreate
  18.  
  19. import spark.implicits._
  20.  
  21. def getSqlDf(host: String, port: Int, name: String, user: String, password: String, tableName: String): sql.DataFrame = {
  22. spark.read
  23. .format("jdbc")
  24. .option("driver", "org.postgresql.Driver")
  25. .option("url", s"jdbc:postgresql://${host}:${port}/${name}?zeroDateTimeBehavior=convertToNull&read_buffer_size=100M")
  26. .option("dbtable", tableName)
  27. .option("user", user)
  28. .option("password", password)
  29. .load
  30. }
  31.  
  32. val citiesDF = getSqlDf(
  33. host = Settings.geoDb.host,
  34. port = Settings.geoDb.port,
  35. name = Settings.geoDb.name,
  36. user = Settings.geoDb.user,
  37. password = Settings.geoDb.password,
  38. tableName = "geo.\"Cities\""
  39. ).withColumn("updatedTimestamp", udfHelpers.toTimestamp(citiesRaw("updatedAt")))
  40. .filter("iata IS null")
  41. .toDF.as[City].cache
Add Comment
Please, Sign In to add comment