Advertisement
Guest User

Untitled

a guest
Jan 2nd, 2019
145
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 1.93 KB | None | 0 0
  1. import org.apache.spark.sql.SparkSession
  2. import org.apache.spark.SparkContext
  3. import org.apache.spark.SparkContext._
  4. import org.apache.spark.SparkConf
  5. import org.apache.spark.sql.types._
  6. import org.apache.spark.sql.functions._
  7. import org.apache.spark.sql.Column
  8. import org.apache.spark.sql.Dataset
  9. import org.apache.spark.sql.Row
  10. import org.apache.spark.sql.functions.udf
  11.  
  12.  
  13. object ReturnTrips {
  14.   def compute(trips : Dataset[Row], dist : Double, spark : SparkSession) : Dataset[Row] = {
  15.  
  16.     import spark.implicits._
  17.  
  18.     def distance(latitudeA: Double, latitudeB: Double, longitudeA: Double, longitudeB: Double): Double = {
  19.       val dlat = (latitudeA - latitudeB) * (Math.PI/180)
  20.       val dlon = (longitudeB - longitudeA) * (Math.PI/180)
  21.       val r = 6371
  22.       val a = (Math.sin(dlat/2) * Math.sin(dlat/2)
  23.         + Math.cos(latitudeA * (Math.PI/180)) * Math.cos(latitudeB * (Math.PI/180)) * Math.sin(dlon/2) * Math.sin(dlon/2))
  24.       val c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1-a))
  25.       r * c
  26.     }
  27.     val distanceUDF = udf[Double, Double, Double, Double, Double](distance)
  28.  
  29. //    val dateBuck = trips.withColumn("date_bucket", floor($"tripsB.tpep_pickup_datetime" / 0.001)).cache()
  30. //    val dateBuckNeighbors = dateBuck.withColumn("date_bucket", explode(array($"date_bucket" - 1, $"date_bucket", $"date_bucket" + 1)))
  31.  
  32.  
  33.     val hours = 8 * 60 * 60 * 1000
  34.     val res = trips.as("tripsA").join(trips.as("tripsB"),
  35.            $"tripsA.tpep_dropoff_datetime" < $"tripsB.tpep_pickup_datetime"
  36.     && unix_timestamp($"tripsA.tpep_dropoff_datetime")  + hours > unix_timestamp($"tripsB.tpep_pickup_datetime"))
  37.     &&  distanceUDF($"tripsB.dropoff_latitude", $"tripsA.pickup_latitude", $"tripsB.dropoff_longitude", $"tripsA.pickup_longitude") < dist
  38.     && distanceUDF($"tripsA.dropoff_latitude", $"tripsB.pickup_latitude", $"tripsA.dropoff_longitude", $"tripsB.pickup_longitude") < dist
  39.     println(res.explain())
  40.     res
  41.   }
  42. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement