Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- case class City(
- id: Int,
- name: String,
- iata: Option[String],
- longitude: Option[Double],
- latitude: Option[Double],
- updatedTimestamp: Long)
- object udfHelpers {
- val toTimestamp = udf((date: String) => new SimpleDateFormat("yyyy-MM-dd HH:mm").parse(date).getTime)
- }
- val spark = SparkSession
- .builder
- .appName("Spark Test")
- .master("local[*]")
- .getOrCreate
- import spark.implicits._
- def getSqlDf(host: String, port: Int, name: String, user: String, password: String, tableName: String): sql.DataFrame = {
- spark.read
- .format("jdbc")
- .option("driver", "org.postgresql.Driver")
- .option("url", s"jdbc:postgresql://${host}:${port}/${name}?zeroDateTimeBehavior=convertToNull&read_buffer_size=100M")
- .option("dbtable", tableName)
- .option("user", user)
- .option("password", password)
- .load
- }
- val citiesDF = getSqlDf(
- host = Settings.geoDb.host,
- port = Settings.geoDb.port,
- name = Settings.geoDb.name,
- user = Settings.geoDb.user,
- password = Settings.geoDb.password,
- tableName = "geo.\"Cities\""
- ).withColumn("updatedTimestamp", udfHelpers.toTimestamp(citiesRaw("updatedAt")))
- .filter("iata IS null")
- .toDF.as[City].cache
Add Comment
Please, Sign In to add comment