Advertisement
Guest User

Untitled

a guest
Jun 25th, 2017
64
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.30 KB | None | 0 0
  1. import org.apache.spark.sql.types._
  2. import org.apache.spark.sql.Column
  3. import org.apache.spark.sql.Dataset
  4. import org.apache.spark.sql.Row
  5.  
  6. import org.apache.spark.sql.functions
  7.  
  8.  
  9. val schema = StructType(Array(
  10. StructField("VendorID", DataTypes.StringType,false),
  11. StructField("tpep_pickup_datetime", DataTypes.TimestampType,false),
  12. StructField("tpep_dropoff_datetime", DataTypes.TimestampType,false),
  13. StructField("passenger_count", DataTypes.IntegerType,false),
  14. StructField("trip_distance", DataTypes.DoubleType,false),
  15. StructField("pickup_longitude", DataTypes.DoubleType,false),
  16. StructField("pickup_latitude", DataTypes.DoubleType,false),
  17. StructField("RatecodeID", DataTypes.IntegerType,false),
  18. StructField("store_and_fwd_flag", DataTypes.StringType,false),
  19. StructField("dropoff_longitude", DataTypes.DoubleType,false),
  20. StructField("dropoff_latitude", DataTypes.DoubleType,false),
  21. StructField("payment_type", DataTypes.IntegerType,false),
  22. StructField("fare_amount", DataTypes.DoubleType,false),
  23. StructField("extra", DataTypes.DoubleType,false),
  24. StructField("mta_tax", DataTypes.DoubleType,false),
  25. StructField("tip_amount", DataTypes.DoubleType,false),
  26. StructField("tolls_amount", DataTypes.DoubleType,false),
  27. StructField("improvement_surcharge", DataTypes.DoubleType,false),
  28. StructField("total_amount", DataTypes.DoubleType, false)
  29. ))
  30.  
  31. val tripsDF = spark.read.schema(schema).option("header", true).csv("yellow_tripdata_2016-01.csv")
  32. val trips = tripsDF.where($"pickup_longitude" =!= 0 && $"pickup_latitude" =!= 0 && $"dropoff_longitude" =!= 0 && $"dropoff_latitude" =!= 0).cache()
  33.  
  34. // STEP 1: Find max latitude to use it to create buckets later, since longitude is depending on that
  35.  
  36. val max_latitude_row = trips.agg(max($"pickup_latitude"), max($"dropoff_latitude")).first()
  37. val max_latitude = math.max(max_latitude_row.getDouble(0), max_latitude_row.getDouble(1))
  38.  
  39. // STEP 2: Create buckets
  40.  
  41. val EARTH_R = 6371
  42. val CHUNK_SIZE = 0.101
  43.  
  44. def calcDistance(latitude1: Column, longitude1: Column, latitude2: Column, longitude2: Column): Column = {
  45. asin(sqrt(pow((toRadians(latitude2) - toRadians(latitude1)) / 2, 2.0) + cos(toRadians(latitude1)) * cos(toRadians(latitude2)) * pow((toRadians(longitude2) - toRadians(longitude1)) / 2, 2.0))) * 2 * EARTH_R
  46. }
  47.  
  48. def calcLatitudeDelta(): Double = {
  49. math.toDegrees(2 * math.asin(math.sqrt(math.pow(CHUNK_SIZE / (2 * EARTH_R), 2.0))))
  50. }
  51.  
  52. def calcLongitudeDelta(at_latitude: Double): Double = {
  53. math.toDegrees(2 * math.asin(math.sqrt(math.pow(CHUNK_SIZE / (2 * EARTH_R), 2.0) / math.pow(math.cos(math.toRadians(at_latitude)), 2.0))))
  54. }
  55.  
  56. val latitude_bucket_delta = calcLatitudeDelta()
  57. val longitude_bucket_delta = calcLongitudeDelta(max_latitude)
  58.  
  59. val bucket_trips = trips.withColumn("pickup_latitude_bucket", floor($"pickup_latitude" / latitude_bucket_delta)).withColumn("pickup_longitude_bucket", floor($"pickup_longitude" / longitude_bucket_delta)).withColumn("dropoff_latitude_bucket", floor($"dropoff_latitude" / latitude_bucket_delta)).withColumn("dropoff_longitude_bucket", floor($"dropoff_longitude" / longitude_bucket_delta))
  60.  
  61. // STEP 3: Explode Data
  62.  
  63. def explodeLatitude(ds: Dataset[Row]): Dataset[Row] = {
  64. val explZero = ds.withColumn("new_lat", $"dropoff_latitude_bucket").drop($"dropoff_latitude_bucket").withColumnRenamed("new_lat", "dropoff_latitude_bucket")
  65. val explMinus = ds.withColumn("new_lat", $"dropoff_latitude_bucket" - 1).drop($"dropoff_latitude_bucket").withColumnRenamed("new_lat", "dropoff_latitude_bucket")
  66. val explPlus = ds.withColumn("new_lat", $"dropoff_latitude_bucket" + 1).drop($"dropoff_latitude_bucket").withColumnRenamed("new_lat", "dropoff_latitude_bucket")
  67. explZero.union(explMinus).union(explPlus)
  68. }
  69.  
  70. def explodeLongitude(ds: Dataset[Row]): Dataset[Row] = {
  71. val explZero = ds.withColumn("new_long", $"dropoff_longitude_bucket").drop($"dropoff_longitude_bucket").withColumnRenamed("new_long", "dropoff_longitude_bucket")
  72. val explMinus = ds.withColumn("new_long", $"dropoff_longitude_bucket" - 1).drop($"dropoff_longitude_bucket").withColumnRenamed("new_long", "dropoff_longitude_bucket")
  73. val explPlus = ds.withColumn("new_long", $"dropoff_longitude_bucket" + 1).drop($"dropoff_longitude_bucket").withColumnRenamed("new_long", "dropoff_longitude_bucket")
  74. explZero.union(explMinus).union(explPlus)
  75. }
  76.  
  77. val exploded_trips = explodeLongitude(explodeLatitude(bucket_trips))
  78.  
  79. // STEP 4: Perform join
  80.  
  81. val result = exploded_trips.as("trip_there").join(exploded_trips.as("trip_back"), (
  82. $"trip_there.dropoff_latitude_bucket" === $"trip_back.pickup_latitude_bucket"
  83. && $"trip_there.dropoff_longitude_bucket" === $"trip_back.pickup_longitude_bucket"
  84. && $"trip_back.dropoff_latitude_bucket" === $"trip_there.pickup_latitude_bucket"
  85. && $"trip_back.dropoff_longitude_bucket" === $"trip_there.pickup_longitude_bucket"
  86.  
  87. && ($"trip_there.tpep_dropoff_datetime".cast("long") < $"trip_back.tpep_pickup_datetime".cast("long"))
  88. && (($"trip_there.tpep_dropoff_datetime".cast("long") + (8 * 60 * 60)) > $"trip_back.tpep_pickup_datetime".cast("long"))
  89. && (calcDistance($"trip_there.dropoff_latitude", $"trip_there.dropoff_longitude", $"trip_back.pickup_latitude", $"trip_back.pickup_longitude") < 0.1)
  90. && (calcDistance($"trip_back.dropoff_latitude", $"trip_back.dropoff_longitude", $"trip_there.pickup_latitude", $"trip_there.pickup_longitude") < 0.1)
  91. ))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement