Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import org.apache.spark.sql.SparkSession
- import org.apache.spark.sql.functions._
- import org.apache.spark.sql.Dataset
- import org.apache.spark.sql.Row
- object ReturnTrips {
- def compute(trips : Dataset[Row], dist : Double, spark : SparkSession) : Dataset[Row] = {
- val trips_enhanced =
- trips
- .cache()
- .withColumn("pickup_latitude", col("pickup_latitude") * Math.PI / 180)
- .withColumn("dropoff_latitude", col("dropoff_latitude") * Math.PI / 180)
- .withColumn("pickup_longitude", col("pickup_longitude") * Math.PI / 180)
- .withColumn("dropoff_longitude", col("dropoff_longitude") * Math.PI / 180)
- .withColumn("pickup_latitude_sin", sin(col("pickup_latitude")))
- .withColumn("pickup_latitude_sin", sin(col("pickup_latitude")))
- .withColumn("pickup_latitude_cos", cos(col("pickup_latitude")))
- .withColumn("dropoff_latitude_sin", sin(col("dropoff_latitude")))
- .withColumn("dropoff_latitude_cos", cos(col("dropoff_latitude")))
- .withColumn("pickup_longitude_sin", sin(col("pickup_longitude")))
- .withColumn("pickup_longitude_cos", cos(col("pickup_longitude")))
- .withColumn("dropoff_longitude_sin", sin(col("dropoff_longitude")))
- .withColumn("dropoff_longitude_cos", cos(col("dropoff_longitude")))
- .withColumn("tpep_pickup_datetime", unix_timestamp(col("tpep_pickup_datetime")))
- .withColumn("tpep_dropoff_datetime", unix_timestamp(col("tpep_dropoff_datetime")))
- .withColumn("pickup_time_bucket", floor(col("tpep_pickup_datetime") / 60 / 60 / 8))
- .withColumn("dropoff_time_bucket", floor(col("tpep_dropoff_datetime") / 60 / 60 / 8))
- .withColumn("pickup_latitude_bucket", floor(col("pickup_latitude") / (4 * dist / 6371000))) // try to make buckets bigger *4
- .withColumn("dropoff_latitude_bucket", floor(col("dropoff_latitude") / (4 * dist / 6371000))) // try to make buckets bigger *4
- .withColumn("pickup_longitude_bucket", floor(col("pickup_longitude") / (8 * dist / 6371000))) // try to make buckets bigger *8
- .withColumn("dropoff_longitude_bucket", floor(col("dropoff_longitude") / (8 * dist / 6371000))) // try to make buckets bigger *8
- .repartition(12)
- .cache()
- trips_enhanced.take(1)
- val trips_enhanced_with_buckets =
- trips_enhanced
- .withColumn("pickup_time_bucket", explode(
- array(col("pickup_time_bucket") - 1, col("pickup_time_bucket"))
- ))
- .withColumn("pickup_latitude_bucket", explode(
- array(col("pickup_latitude_bucket") - 1, col("pickup_latitude_bucket"), col("pickup_latitude_bucket") + 1)
- ))
- .withColumn("dropoff_latitude_bucket", explode(
- array(col("dropoff_latitude_bucket") - 1, col("dropoff_latitude_bucket"), col("dropoff_latitude_bucket") + 1)
- ))
- .withColumn("pickup_longitude_bucket", explode(
- array(col("pickup_longitude_bucket") - 1, col("pickup_longitude_bucket"), col("pickup_longitude_bucket") + 1)
- ))
- .withColumn("dropoff_longitude_bucket", explode(
- array(col("dropoff_longitude_bucket") - 1, col("dropoff_longitude_bucket"), col("dropoff_longitude_bucket") + 1)
- ))
- .repartition(12)
- .cache()
- val const_dist = Math.cos(dist / 6371000)
- trips_enhanced.as("a").join(trips_enhanced_with_buckets.as("b"),
- col("a.pickup_latitude_bucket") === col("b.dropoff_latitude_bucket") &&
- col("a.dropoff_latitude_bucket") === col("b.pickup_latitude_bucket") &&
- col("a.pickup_longitude_bucket") === col("b.dropoff_longitude_bucket") &&
- col("a.dropoff_longitude_bucket") === col("b.pickup_longitude_bucket") &&
- col("a.dropoff_time_bucket") === col("b.pickup_time_bucket") &&
- col("a.tpep_dropoff_datetime") < col("b.tpep_pickup_datetime") &&
- col("a.tpep_dropoff_datetime") + 8 * 60 * 60 > col("b.tpep_pickup_datetime") &&
- col("a.pickup_latitude_sin") * col("b.dropoff_latitude_sin") +
- col("a.pickup_latitude_cos") * col("b.dropoff_latitude_cos") *
- (col("a.pickup_longitude_cos") * col("b.dropoff_longitude_cos") +
- col("a.pickup_longitude_sin") * col("b.dropoff_longitude_sin"))
- > const_dist &&
- col("b.pickup_latitude_sin") * col("a.dropoff_latitude_sin") +
- col("b.pickup_latitude_cos") * col("a.dropoff_latitude_cos") *
- (col("b.pickup_longitude_cos") * col("a.dropoff_longitude_cos") +
- col("b.pickup_longitude_sin") * col("a.dropoff_longitude_sin"))
- > const_dist
- )
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement