Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import org.apache.spark.sql.SparkSession
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkContext._
- import org.apache.spark.SparkConf
- import org.apache.spark.sql.types._
- import org.apache.spark.sql.functions._
- import org.apache.spark.sql.Column
- import org.apache.spark.sql.Dataset
- import org.apache.spark.sql.Row
- import org.apache.spark.sql.functions.udf
- object ReturnTrips {
- def compute(trips : Dataset[Row], dist : Double, spark : SparkSession) : Dataset[Row] = {
- import spark.implicits._
- def distance(latitudeA: Double, latitudeB: Double, longitudeA: Double, longitudeB: Double): Double = {
- val dlat = (latitudeA - latitudeB) * (Math.PI/180)
- val dlon = (longitudeB - longitudeA) * (Math.PI/180)
- val r = 6371
- val a = (Math.sin(dlat/2) * Math.sin(dlat/2)
- + Math.cos(latitudeA * (Math.PI/180)) * Math.cos(latitudeB * (Math.PI/180)) * Math.sin(dlon/2) * Math.sin(dlon/2))
- val c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1-a))
- r * c
- }
- val distanceUDF = udf[Double, Double, Double, Double, Double](distance)
- // val dateBuck = trips.withColumn("date_bucket", floor($"tripsB.tpep_pickup_datetime" / 0.001)).cache()
- // val dateBuckNeighbors = dateBuck.withColumn("date_bucket", explode(array($"date_bucket" - 1, $"date_bucket", $"date_bucket" + 1)))
- val hours = 8 * 60 * 60 * 1000
- val res = trips.as("tripsA").join(trips.as("tripsB"),
- $"tripsA.tpep_dropoff_datetime" < $"tripsB.tpep_pickup_datetime"
- && unix_timestamp($"tripsA.tpep_dropoff_datetime") + hours > unix_timestamp($"tripsB.tpep_pickup_datetime"))
- && distanceUDF($"tripsB.dropoff_latitude", $"tripsA.pickup_latitude", $"tripsB.dropoff_longitude", $"tripsA.pickup_longitude") < dist
- && distanceUDF($"tripsA.dropoff_latitude", $"tripsB.pickup_latitude", $"tripsA.dropoff_longitude", $"tripsB.pickup_longitude") < dist
- println(res.explain())
- res
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement