SHARE
TWEET

Untitled

a guest Jan 11th, 2019 73 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import org.apache.spark.sql.SparkSession
  2. import org.apache.spark.sql.functions._
  3. import org.apache.spark.sql.Dataset
  4. import org.apache.spark.sql.Row
  5.  
  6. object ReturnTrips {
  7.   def compute(trips : Dataset[Row], dist : Double, spark : SparkSession) : Dataset[Row] = {
  8.  
  9.     val trips_enhanced =
  10.       trips
  11.         .cache()
  12.         .withColumn("pickup_latitude", col("pickup_latitude") * Math.PI / 180)
  13.         .withColumn("dropoff_latitude", col("dropoff_latitude") * Math.PI / 180)
  14.         .withColumn("pickup_longitude", col("pickup_longitude") * Math.PI / 180)
  15.         .withColumn("dropoff_longitude", col("dropoff_longitude") * Math.PI / 180)
  16.         .withColumn("pickup_latitude_sin", sin(col("pickup_latitude")))
  17.         .withColumn("pickup_latitude_sin", sin(col("pickup_latitude")))
  18.         .withColumn("pickup_latitude_cos", cos(col("pickup_latitude")))
  19.         .withColumn("dropoff_latitude_sin", sin(col("dropoff_latitude")))
  20.         .withColumn("dropoff_latitude_cos", cos(col("dropoff_latitude")))
  21.         .withColumn("pickup_longitude_sin", sin(col("pickup_longitude")))
  22.         .withColumn("pickup_longitude_cos", cos(col("pickup_longitude")))
  23.         .withColumn("dropoff_longitude_sin", sin(col("dropoff_longitude")))
  24.         .withColumn("dropoff_longitude_cos", cos(col("dropoff_longitude")))
  25.         .withColumn("tpep_pickup_datetime", unix_timestamp(col("tpep_pickup_datetime")))
  26.         .withColumn("tpep_dropoff_datetime", unix_timestamp(col("tpep_dropoff_datetime")))
  27.         .withColumn("pickup_time_bucket", floor(col("tpep_pickup_datetime") / 60 / 60 / 8))
  28.         .withColumn("dropoff_time_bucket", floor(col("tpep_dropoff_datetime") / 60 / 60 / 8))
  29.         .withColumn("pickup_latitude_bucket", floor(col("pickup_latitude") / (4 * dist / 6371000))) // try to make buckets bigger *4
  30.         .withColumn("dropoff_latitude_bucket", floor(col("dropoff_latitude") / (4 * dist / 6371000))) // try to make buckets bigger *4
  31.         .withColumn("pickup_longitude_bucket", floor(col("pickup_longitude") / (8 * dist / 6371000))) // try to make buckets bigger *8
  32.         .withColumn("dropoff_longitude_bucket", floor(col("dropoff_longitude") / (8 * dist / 6371000))) // try to make buckets bigger *8
  33.         .repartition(12)
  34.         .cache()
  35.  
  36.     trips_enhanced.take(1)
  37.  
  38.     val trips_enhanced_with_buckets =
  39.       trips_enhanced
  40.         .withColumn("pickup_time_bucket", explode(
  41.           array(col("pickup_time_bucket") - 1, col("pickup_time_bucket"))
  42.         ))
  43.         .withColumn("pickup_latitude_bucket", explode(
  44.           array(col("pickup_latitude_bucket") - 1, col("pickup_latitude_bucket"), col("pickup_latitude_bucket") + 1)
  45.         ))
  46.         .withColumn("dropoff_latitude_bucket", explode(
  47.           array(col("dropoff_latitude_bucket") - 1, col("dropoff_latitude_bucket"), col("dropoff_latitude_bucket") + 1)
  48.         ))
  49.         .withColumn("pickup_longitude_bucket", explode(
  50.           array(col("pickup_longitude_bucket") - 1, col("pickup_longitude_bucket"), col("pickup_longitude_bucket") + 1)
  51.         ))
  52.         .withColumn("dropoff_longitude_bucket", explode(
  53.           array(col("dropoff_longitude_bucket") - 1, col("dropoff_longitude_bucket"), col("dropoff_longitude_bucket") + 1)
  54.         ))
  55.         .repartition(12)
  56.         .cache()
  57.  
  58.     val const_dist = Math.cos(dist / 6371000)
  59.  
  60.     trips_enhanced.as("a").join(trips_enhanced_with_buckets.as("b"),
  61.         col("a.pickup_latitude_bucket") === col("b.dropoff_latitude_bucket") &&
  62.         col("a.dropoff_latitude_bucket") === col("b.pickup_latitude_bucket") &&
  63.         col("a.pickup_longitude_bucket") === col("b.dropoff_longitude_bucket") &&
  64.         col("a.dropoff_longitude_bucket") === col("b.pickup_longitude_bucket") &&
  65.         col("a.dropoff_time_bucket") === col("b.pickup_time_bucket") &&
  66.  
  67.         col("a.tpep_dropoff_datetime") < col("b.tpep_pickup_datetime") &&
  68.         col("a.tpep_dropoff_datetime") + 8 * 60 * 60 > col("b.tpep_pickup_datetime") &&
  69.         col("a.pickup_latitude_sin") * col("b.dropoff_latitude_sin") +
  70.           col("a.pickup_latitude_cos") * col("b.dropoff_latitude_cos") *
  71.             (col("a.pickup_longitude_cos") * col("b.dropoff_longitude_cos") +
  72.               col("a.pickup_longitude_sin") * col("b.dropoff_longitude_sin"))
  73.           > const_dist &&
  74.         col("b.pickup_latitude_sin") * col("a.dropoff_latitude_sin") +
  75.           col("b.pickup_latitude_cos") * col("a.dropoff_latitude_cos") *
  76.             (col("b.pickup_longitude_cos") * col("a.dropoff_longitude_cos") +
  77.               col("b.pickup_longitude_sin") * col("a.dropoff_longitude_sin"))
  78.           > const_dist
  79.     )
  80.   }
  81. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top