Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- def locateTemperatures(year: Year, stationsFile: String, temperaturesFile: String): Iterable[(LocalDate, Location, Temperature)] = {
- def getFilePath(resourcePath: String) = Paths.get(getClass.getResource(resourcePath).toURI).toString
- def fahrenheitToCelsius(fahrenheit: Double): Double = (fahrenheit - 32.0) * 5.0 / 9.0
- val stationSchema = StructType(Seq(
- StructField("stn", StringType, nullable = true),
- StructField("wban", StringType, nullable = true),
- StructField("lat", DoubleType, nullable = true),
- StructField("lon", DoubleType, nullable = true)
- ))
- val temperatureSchema = StructType(Seq(
- StructField("stn", StringType, nullable = true),
- StructField("wban", StringType, nullable = true),
- StructField("month", IntegerType, nullable = true),
- StructField("day", IntegerType, nullable = true),
- StructField("temp", DoubleType, nullable = true)
- ))
- val stationsDF = spark.read.format("csv").schema(stationSchema).load(getFilePath(stationsFile)).na.drop(Seq("lat", "lon")).na.fill("")
- val temperaturesDF = spark.read.format("csv").schema(temperatureSchema).load(getFilePath(temperaturesFile)).na.drop(Seq("month", "day", "temp")).na.fill("")
- temperaturesDF.join(stationsDF, usingColumns = Seq("stn", "wban")).collect().map(row =>
- (LocalDate.of(year, row.getInt(2), row.getInt(3)), Location(row.getDouble(5), row.getDouble(6)), fahrenheitToCelsius(row.getDouble(4))))
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement