Advertisement
Guest User

Untitled

a guest
Jan 11th, 2019
95
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.58 KB | None | 0 0
  1. import org.apache.spark.sql.SparkSession
  2. import org.apache.spark.sql.functions._
  3. import org.apache.spark.sql.Dataset
  4. import org.apache.spark.sql.Row
  5.  
  6. object ReturnTrips {
  7. def compute(trips : Dataset[Row], dist : Double, spark : SparkSession) : Dataset[Row] = {
  8.  
  9. val trips_enhanced =
  10. trips
  11. .cache()
  12. .withColumn("pickup_latitude", col("pickup_latitude") * Math.PI / 180)
  13. .withColumn("dropoff_latitude", col("dropoff_latitude") * Math.PI / 180)
  14. .withColumn("pickup_longitude", col("pickup_longitude") * Math.PI / 180)
  15. .withColumn("dropoff_longitude", col("dropoff_longitude") * Math.PI / 180)
  16. .withColumn("pickup_latitude_sin", sin(col("pickup_latitude")))
  17. .withColumn("pickup_latitude_sin", sin(col("pickup_latitude")))
  18. .withColumn("pickup_latitude_cos", cos(col("pickup_latitude")))
  19. .withColumn("dropoff_latitude_sin", sin(col("dropoff_latitude")))
  20. .withColumn("dropoff_latitude_cos", cos(col("dropoff_latitude")))
  21. .withColumn("pickup_longitude_sin", sin(col("pickup_longitude")))
  22. .withColumn("pickup_longitude_cos", cos(col("pickup_longitude")))
  23. .withColumn("dropoff_longitude_sin", sin(col("dropoff_longitude")))
  24. .withColumn("dropoff_longitude_cos", cos(col("dropoff_longitude")))
  25. .withColumn("tpep_pickup_datetime", unix_timestamp(col("tpep_pickup_datetime")))
  26. .withColumn("tpep_dropoff_datetime", unix_timestamp(col("tpep_dropoff_datetime")))
  27. .withColumn("pickup_time_bucket", floor(col("tpep_pickup_datetime") / 60 / 60 / 8))
  28. .withColumn("dropoff_time_bucket", floor(col("tpep_dropoff_datetime") / 60 / 60 / 8))
  29. .withColumn("pickup_latitude_bucket", floor(col("pickup_latitude") / (4 * dist / 6371000))) // try to make buckets bigger *4
  30. .withColumn("dropoff_latitude_bucket", floor(col("dropoff_latitude") / (4 * dist / 6371000))) // try to make buckets bigger *4
  31. .withColumn("pickup_longitude_bucket", floor(col("pickup_longitude") / (8 * dist / 6371000))) // try to make buckets bigger *8
  32. .withColumn("dropoff_longitude_bucket", floor(col("dropoff_longitude") / (8 * dist / 6371000))) // try to make buckets bigger *8
  33. .repartition(12)
  34. .cache()
  35.  
  36. trips_enhanced.take(1)
  37.  
  38. val trips_enhanced_with_buckets =
  39. trips_enhanced
  40. .withColumn("pickup_time_bucket", explode(
  41. array(col("pickup_time_bucket") - 1, col("pickup_time_bucket"))
  42. ))
  43. .withColumn("pickup_latitude_bucket", explode(
  44. array(col("pickup_latitude_bucket") - 1, col("pickup_latitude_bucket"), col("pickup_latitude_bucket") + 1)
  45. ))
  46. .withColumn("dropoff_latitude_bucket", explode(
  47. array(col("dropoff_latitude_bucket") - 1, col("dropoff_latitude_bucket"), col("dropoff_latitude_bucket") + 1)
  48. ))
  49. .withColumn("pickup_longitude_bucket", explode(
  50. array(col("pickup_longitude_bucket") - 1, col("pickup_longitude_bucket"), col("pickup_longitude_bucket") + 1)
  51. ))
  52. .withColumn("dropoff_longitude_bucket", explode(
  53. array(col("dropoff_longitude_bucket") - 1, col("dropoff_longitude_bucket"), col("dropoff_longitude_bucket") + 1)
  54. ))
  55. .repartition(12)
  56. .cache()
  57.  
  58. val const_dist = Math.cos(dist / 6371000)
  59.  
  60. trips_enhanced.as("a").join(trips_enhanced_with_buckets.as("b"),
  61. col("a.pickup_latitude_bucket") === col("b.dropoff_latitude_bucket") &&
  62. col("a.dropoff_latitude_bucket") === col("b.pickup_latitude_bucket") &&
  63. col("a.pickup_longitude_bucket") === col("b.dropoff_longitude_bucket") &&
  64. col("a.dropoff_longitude_bucket") === col("b.pickup_longitude_bucket") &&
  65. col("a.dropoff_time_bucket") === col("b.pickup_time_bucket") &&
  66.  
  67. col("a.tpep_dropoff_datetime") < col("b.tpep_pickup_datetime") &&
  68. col("a.tpep_dropoff_datetime") + 8 * 60 * 60 > col("b.tpep_pickup_datetime") &&
  69. col("a.pickup_latitude_sin") * col("b.dropoff_latitude_sin") +
  70. col("a.pickup_latitude_cos") * col("b.dropoff_latitude_cos") *
  71. (col("a.pickup_longitude_cos") * col("b.dropoff_longitude_cos") +
  72. col("a.pickup_longitude_sin") * col("b.dropoff_longitude_sin"))
  73. > const_dist &&
  74. col("b.pickup_latitude_sin") * col("a.dropoff_latitude_sin") +
  75. col("b.pickup_latitude_cos") * col("a.dropoff_latitude_cos") *
  76. (col("b.pickup_longitude_cos") * col("a.dropoff_longitude_cos") +
  77. col("b.pickup_longitude_sin") * col("a.dropoff_longitude_sin"))
  78. > const_dist
  79. )
  80. }
  81. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement