Advertisement
Guest User

Untitled

a guest
Jul 24th, 2019
238
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.44 KB | None | 0 0
  1. def locateTemperatures(year: Year, stationsFile: String, temperaturesFile: String): Iterable[(LocalDate, Location, Temperature)] = {
  2.  
  3. def getFilePath(resourcePath: String) = Paths.get(getClass.getResource(resourcePath).toURI).toString
  4.  
  5. def fahrenheitToCelsius(fahrenheit: Double): Double = (fahrenheit - 32.0) * 5.0 / 9.0
  6.  
  7. val stationSchema = StructType(Seq(
  8. StructField("stn", StringType, nullable = true),
  9. StructField("wban", StringType, nullable = true),
  10. StructField("lat", DoubleType, nullable = true),
  11. StructField("lon", DoubleType, nullable = true)
  12. ))
  13. val temperatureSchema = StructType(Seq(
  14. StructField("stn", StringType, nullable = true),
  15. StructField("wban", StringType, nullable = true),
  16. StructField("month", IntegerType, nullable = true),
  17. StructField("day", IntegerType, nullable = true),
  18. StructField("temp", DoubleType, nullable = true)
  19. ))
  20. val stationsDF = spark.read.format("csv").schema(stationSchema).load(getFilePath(stationsFile)).na.drop(Seq("lat", "lon")).na.fill("")
  21. val temperaturesDF = spark.read.format("csv").schema(temperatureSchema).load(getFilePath(temperaturesFile)).na.drop(Seq("month", "day", "temp")).na.fill("")
  22. temperaturesDF.join(stationsDF, usingColumns = Seq("stn", "wban")).collect().map(row =>
  23. (LocalDate.of(year, row.getInt(2), row.getInt(3)), Location(row.getDouble(5), row.getDouble(6)), fahrenheitToCelsius(row.getDouble(4))))
  24. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement