Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import org.apache.spark.sql.SparkSession
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkContext._
- import org.apache.spark.SparkConf
- import org.apache.spark.sql.types._
- import org.apache.spark.sql.functions._
- import org.apache.spark.sql.Column
- import org.apache.spark.sql.Dataset
- import org.apache.spark.sql.Row
- import org.apache.spark.sql.functions.udf
- object ReturnTrips {
- def compute(trips : Dataset[Row], dist : Double, spark : SparkSession) : Dataset[Row] = {
- import spark.implicits._
- //
- // val distanceUDF = udf[Double, Double, Double, Double, Double](distance)
- // val hours = 8 * 60 * 60
- // // val res = trips.as("A").join(trips.as("B"),
- // // $"A.tpep_dropoff_datetime" < $"B.tpep_pickup_datetime"
- // // && unix_timestamp($"B.tpep_pickup_datetime") - unix_timestamp($"A.tpep_dropoff_datetime") < hours
- // // && distanceUDF($"A.dropoff_latitude", $"A.dropoff_longitude", $"B.pickup_latitude", $"B.pickup_longitude") < dist/1000.0
- // // && distanceUDF($"B.dropoff_latitude", $"B.dropoff_longitude", $"A.pickup_latitude", $"A.pickup_longitude") < dist/1000.0)
- //
- // val r = 6371.0
- //
- // val lat_dist = (Math.PI * r * 1000.0)/180.0
- // val bucket = dist/lat_dist
- //
- //
- // val km_dist = dist/1000.0
- //
- //
- ////
- //// trips
- //
- // val tripsBuck = trips.withColumn("tpep_dropoff_datetime", unix_timestamp($"tpep_dropoff_datetime"))
- // .withColumn("tpep_pickup_datetime", unix_timestamp($"tpep_pickup_datetime"))
- // .withColumn("dropoff_lat_bucket", floor($"dropoff_latitude" / bucket))
- // .withColumn("pickup_lat_bucket", floor($"pickup_latitude" /bucket)).cache()
- //
- // // .withColumn("dropoff_bucket", floor(unix_timestamp($"tpep_dropoff_datetime") / (hours * 1000.0)))
- // // .withColumn("pickup_bucket", floor(unix_timestamp($"tpep_pickup_datetime") / (hours * 1000.0)))
- //// .withColumn("a", pow(sin(toRadians($"pickup_latitude" - $"dropoff_latitude") / 2), 2)
- //// + cos(toRadians($"dropoff_latitude")) * cos(toRadians($"pickup_latitude")) * pow(sin(toRadians($"pickup_longitude" - $"dropoff_longitude") / 2), 2))
- //// .withColumn("distance_bucket", (atan2(sqrt($"a"), sqrt(-$"a" + 1)) * 2 * 6371)/dist).cache()
- //
- //// .withColumn("pickup_bucket",
- // // explode(array($"pickup_bucket" - 1, $"pickup_bucket", $"pickup_bucket" + 1)))
- // // .withColumn("distance_bucket",
- // // explode(array($"distance_bucket" - 1, $"distance_bucket", $"distance_bucket" + 1)))
- // // .cache()
- //
- // val tripsBuckNeighbors = tripsBuck.withColumn("dropoff_lat_bucket",
- // explode(array ($"dropoff_lat_bucket" - 1, $"dropoff_lat_bucket", $"dropoff_lat_bucket" + 1)))
- // .withColumn("pickup_lat_bucket",
- // explode(array ($"pickup_lat_bucket" - 1, $"pickup_lat_bucket", $"pickup_lat_bucket" + 1))).cache()
- //
- //
- // val res = tripsBuck.as("A").join(
- // tripsBuckNeighbors.as("B"),
- //// $"A.dropoff_bucket" === $"B.pickup_bucket"
- //// && $"A.distance_bucket" === $"B.distance_bucket"
- //
- // $"B.dropoff_lat_bucket" === $"A.pickup_lat_bucket"
- // && $"A.dropoff_lat_bucket" === $"B.pickup_lat_bucket"
- // && $"A.tpep_dropoff_datetime" < $"B.tpep_pickup_datetime"
- // && $"B.tpep_pickup_datetime" - $"A.tpep_dropoff_datetime" < hours
- // && distanceUDF($"A.dropoff_latitude", $"A.dropoff_longitude", $"B.pickup_latitude", $"B.pickup_longitude") < km_dist
- // && distanceUDF($"B.dropoff_latitude", $"B.dropoff_longitude", $"A.pickup_latitude", $"A.pickup_longitude") < km_dist).cache()
- //
- //// val result = res.withColumn("a", pow(sin(toRadians($"A.pickup_latitude" - $"B.dropoff_latitude") / 2), 2)
- //// + cos(toRadians($"B.dropoff_latitude")) * cos(toRadians($"A.pickup_latitude")) *
- //// pow(sin(toRadians($"B.pickup_longitude" - $"A.dropoff_longitude") / 2), 2))
- //// .withColumn("distance", (atan2(sqrt($"a"), sqrt(-$"a" + 1)) * 2 * 6371))
- //// .filter($"distance" < km_dist)
- //
- // res
- def distance(latitudeA: Column, longitudeA: Column, latitudeB: Column, longitudeB: Column): Column = {
- val dlat = radians(latitudeA - latitudeB)
- val dlon = radians(longitudeB - longitudeA)
- val r = 6371
- val a = (sin(dlat/2) * sin(dlat/2)
- + cos(radians(latitudeA )) * cos(radians(latitudeB)) * sin(dlon/2) * sin(dlon/2))
- val c = atan2(sqrt(a), sqrt(-a+1)) * 2
- c * r
- }
- // val distanceUDF = udf[Column, Column, Column, Column, Column](distance)
- val hours = 8 * 60 * 60
- val r = 6371.0
- val lat_dist = (Math.PI * r * 1000.0)/180.0
- val tripsBuck = trips.withColumn("tpep_dropoff_datetime", unix_timestamp($"tpep_dropoff_datetime"))
- .withColumn("tpep_pickup_datetime", unix_timestamp($"tpep_pickup_datetime"))
- .withColumn("dropoff_lat_bucket", floor($"dropoff_latitude" / (dist/lat_dist)))
- .withColumn("pickup_lat_bucket", floor($"pickup_latitude" / (dist/lat_dist))).cache()
- val tripsBuckNeighbors = tripsBuck.withColumn("dropoff_lat_bucket",
- explode(array ($"dropoff_lat_bucket" - 1, $"dropoff_lat_bucket", $"dropoff_lat_bucket" + 1)))
- .withColumn("pickup_lat_bucket",
- explode(array ($"pickup_lat_bucket" - 1, $"pickup_lat_bucket", $"pickup_lat_bucket" + 1))).cache()
- val km_dist = dist/1000.0
- val result = tripsBuck.as("A").join(
- tripsBuckNeighbors.as("B"),
- $"A.dropoff_lat_bucket" === $"B.pickup_lat_bucket"
- && $"B.dropoff_lat_bucket" === $"A.pickup_lat_bucket"
- && $"A.tpep_dropoff_datetime" < $"B.tpep_pickup_datetime"
- && $"B.tpep_pickup_datetime" - $"A.tpep_dropoff_datetime" < hours
- && distance($"A.dropoff_latitude", $"A.dropoff_longitude", $"B.pickup_latitude", $"B.pickup_longitude") < km_dist
- && distance($"B.dropoff_latitude", $"B.dropoff_longitude", $"A.pickup_latitude", $"A.pickup_longitude") < km_dist
- ).cache()
- // val diameter = 2 * r
- //
- // val res = result.withColumn("second_a", pow(sin(toRadians($"A.dropoff_latitude" - $"B.pickup_latitude") / 2), 2)
- // + cos(toRadians($"A.dropoff_latitude")) * cos(toRadians($"B.pickup_latitude")) * pow(sin(toRadians($"B.pickup_longitude" - $"A.dropoff_longitude") / 2), 2))
- // .withColumn("second_dist", (atan2(sqrt($"second_a"), sqrt(-$"second_a" + 1)) * diameter))
- // .withColumn("first_a", pow(sin(toRadians($"B.dropoff_latitude" - $"A.pickup_latitude") / 2), 2)
- // + cos(toRadians($"B.dropoff_latitude")) * cos(toRadians($"A.pickup_latitude"))
- // * pow(sin(toRadians($"A.pickup_longitude" - $"B.dropoff_longitude") / 2), 2))
- // .withColumn("first_dist", (atan2(sqrt($"first_a"), sqrt(-$"first_a" + 1)) * diameter))
- //
- // val res = result.select((pow(sin(toRadians($"A.dropoff_latitude" - $"B.pickup_latitude") / 2), 2)
- // + cos(toRadians($"A.dropoff_latitude")) * cos(toRadians($"B.pickup_latitude")) * pow(sin(toRadians($"B.pickup_longitude" - $"A.dropoff_longitude") / 2), 2)).as("a"),
- // (atan2(sqrt($"a"), sqrt(-$"a" + 1)) * diameter).as("first_dist"),
- // ( pow(sin(toRadians($"B.dropoff_latitude" - $"A.pickup_latitude") / 2), 2)
- // + cos(toRadians($"B.dropoff_latitude")) * cos(toRadians($"A.pickup_latitude"))
- // * pow(sin(toRadians($"A.pickup_longitude" - $"B.dropoff_longitude") / 2), 2)).as("b"), (atan2(sqrt($"b"), sqrt(-$"b" + 1)) * diameter)
- // )
- // val final_res = res.filter($"second_dist" < km_dist && $"first_dist" < km_dist).cache()
- result
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement