Advertisement
Guest User

Untitled

a guest
Jan 11th, 2019
76
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 7.63 KB | None | 0 0
  1. import org.apache.spark.sql.SparkSession
  2. import org.apache.spark.SparkContext
  3. import org.apache.spark.SparkContext._
  4. import org.apache.spark.SparkConf
  5. import org.apache.spark.sql.types._
  6. import org.apache.spark.sql.functions._
  7. import org.apache.spark.sql.Column
  8. import org.apache.spark.sql.Dataset
  9. import org.apache.spark.sql.Row
  10. import org.apache.spark.sql.functions.udf
  11.  
  12.  
  13. object ReturnTrips {
  14.   def compute(trips : Dataset[Row], dist : Double, spark : SparkSession) : Dataset[Row] = {
  15.  
  16.     import spark.implicits._
  17.  
  18.  
  19. //
  20. //    val distanceUDF = udf[Double, Double, Double, Double, Double](distance)
  21. //    val hours = 8 * 60 * 60
  22. //    //    val res = trips.as("A").join(trips.as("B"),
  23. //    //      $"A.tpep_dropoff_datetime" < $"B.tpep_pickup_datetime"
  24. //    //             && unix_timestamp($"B.tpep_pickup_datetime") - unix_timestamp($"A.tpep_dropoff_datetime") < hours
  25. //    //             && distanceUDF($"A.dropoff_latitude", $"A.dropoff_longitude", $"B.pickup_latitude", $"B.pickup_longitude") < dist/1000.0
  26. //    //             && distanceUDF($"B.dropoff_latitude", $"B.dropoff_longitude", $"A.pickup_latitude", $"A.pickup_longitude") < dist/1000.0)
  27. //
  28. //    val r = 6371.0
  29. //
  30. //    val lat_dist = (Math.PI * r * 1000.0)/180.0
  31. //    val bucket = dist/lat_dist
  32. //
  33. //
  34. //    val km_dist = dist/1000.0
  35. //
  36. //
  37. ////
  38. ////    trips
  39. //
  40. //    val tripsBuck = trips.withColumn("tpep_dropoff_datetime", unix_timestamp($"tpep_dropoff_datetime"))
  41. //            .withColumn("tpep_pickup_datetime", unix_timestamp($"tpep_pickup_datetime"))
  42. //            .withColumn("dropoff_lat_bucket", floor($"dropoff_latitude" / bucket))
  43. //      .withColumn("pickup_lat_bucket", floor($"pickup_latitude" /bucket)).cache()
  44. //
  45. //    //            .withColumn("dropoff_bucket", floor(unix_timestamp($"tpep_dropoff_datetime") / (hours * 1000.0)))
  46. //    //            .withColumn("pickup_bucket", floor(unix_timestamp($"tpep_pickup_datetime") / (hours * 1000.0)))
  47. ////      .withColumn("a", pow(sin(toRadians($"pickup_latitude" - $"dropoff_latitude") / 2), 2)
  48. ////        + cos(toRadians($"dropoff_latitude")) * cos(toRadians($"pickup_latitude")) * pow(sin(toRadians($"pickup_longitude" - $"dropoff_longitude") / 2), 2))
  49. ////      .withColumn("distance_bucket", (atan2(sqrt($"a"), sqrt(-$"a" + 1)) * 2 * 6371)/dist).cache()
  50. //
  51. ////        .withColumn("pickup_bucket",
  52. //    //              explode(array($"pickup_bucket" - 1, $"pickup_bucket", $"pickup_bucket" + 1)))
  53. //    //            .withColumn("distance_bucket",
  54. //    //              explode(array($"distance_bucket" - 1, $"distance_bucket", $"distance_bucket" + 1)))
  55. //    //      .cache()
  56. //
  57. //    val tripsBuckNeighbors = tripsBuck.withColumn("dropoff_lat_bucket",
  58. //      explode(array ($"dropoff_lat_bucket" - 1, $"dropoff_lat_bucket", $"dropoff_lat_bucket" + 1)))
  59. //      .withColumn("pickup_lat_bucket",
  60. //        explode(array ($"pickup_lat_bucket" - 1, $"pickup_lat_bucket", $"pickup_lat_bucket" + 1))).cache()
  61. //
  62. //
  63. //    val res = tripsBuck.as("A").join(
  64. //      tripsBuckNeighbors.as("B"),
  65. ////            $"A.dropoff_bucket" === $"B.pickup_bucket"
  66. ////             && $"A.distance_bucket" === $"B.distance_bucket"
  67. //
  68. //          $"B.dropoff_lat_bucket" === $"A.pickup_lat_bucket"
  69. //        && $"A.dropoff_lat_bucket" === $"B.pickup_lat_bucket"
  70. //        && $"A.tpep_dropoff_datetime" < $"B.tpep_pickup_datetime"
  71. //        && $"B.tpep_pickup_datetime" - $"A.tpep_dropoff_datetime" < hours
  72. //        && distanceUDF($"A.dropoff_latitude", $"A.dropoff_longitude", $"B.pickup_latitude", $"B.pickup_longitude") < km_dist
  73. //        && distanceUDF($"B.dropoff_latitude", $"B.dropoff_longitude", $"A.pickup_latitude", $"A.pickup_longitude") < km_dist).cache()
  74. //
  75. ////    val result = res.withColumn("a", pow(sin(toRadians($"A.pickup_latitude" - $"B.dropoff_latitude") / 2), 2)
  76. ////            + cos(toRadians($"B.dropoff_latitude")) * cos(toRadians($"A.pickup_latitude")) *
  77. ////      pow(sin(toRadians($"B.pickup_longitude" - $"A.dropoff_longitude") / 2), 2))
  78. ////            .withColumn("distance", (atan2(sqrt($"a"), sqrt(-$"a" + 1)) * 2 * 6371))
  79. ////      .filter($"distance" < km_dist)
  80. //
  81. //    res
  82.  
  83.     def distance(latitudeA: Column, longitudeA: Column, latitudeB: Column, longitudeB: Column): Column = {
  84.       val dlat = radians(latitudeA - latitudeB)
  85.       val dlon = radians(longitudeB - longitudeA)
  86.       val r = 6371
  87.       val a = (sin(dlat/2) * sin(dlat/2)
  88.         + cos(radians(latitudeA )) * cos(radians(latitudeB)) * sin(dlon/2) * sin(dlon/2))
  89.       val c =  atan2(sqrt(a), sqrt(-a+1)) * 2
  90.       c * r
  91.     }
  92.  
  93. //    val distanceUDF = udf[Column, Column, Column, Column, Column](distance)
  94.     val hours = 8 * 60 * 60
  95.     val r = 6371.0
  96.     val lat_dist = (Math.PI * r * 1000.0)/180.0
  97.     val tripsBuck = trips.withColumn("tpep_dropoff_datetime", unix_timestamp($"tpep_dropoff_datetime"))
  98.                   .withColumn("tpep_pickup_datetime", unix_timestamp($"tpep_pickup_datetime"))
  99.       .withColumn("dropoff_lat_bucket", floor($"dropoff_latitude" / (dist/lat_dist)))
  100.       .withColumn("pickup_lat_bucket", floor($"pickup_latitude" / (dist/lat_dist))).cache()
  101.  
  102.       val tripsBuckNeighbors = tripsBuck.withColumn("dropoff_lat_bucket",
  103.       explode(array ($"dropoff_lat_bucket" - 1, $"dropoff_lat_bucket", $"dropoff_lat_bucket" + 1)))
  104.       .withColumn("pickup_lat_bucket",
  105.         explode(array ($"pickup_lat_bucket" - 1, $"pickup_lat_bucket", $"pickup_lat_bucket" + 1))).cache()
  106.  
  107.     val km_dist = dist/1000.0
  108.  
  109.     val result = tripsBuck.as("A").join(
  110.       tripsBuckNeighbors.as("B"),
  111.        $"A.dropoff_lat_bucket" === $"B.pickup_lat_bucket"
  112.         &&  $"B.dropoff_lat_bucket" === $"A.pickup_lat_bucket"
  113.          && $"A.tpep_dropoff_datetime" < $"B.tpep_pickup_datetime"
  114.          && $"B.tpep_pickup_datetime" - $"A.tpep_dropoff_datetime" < hours
  115.          && distance($"A.dropoff_latitude", $"A.dropoff_longitude", $"B.pickup_latitude", $"B.pickup_longitude") < km_dist
  116.          && distance($"B.dropoff_latitude", $"B.dropoff_longitude", $"A.pickup_latitude", $"A.pickup_longitude") < km_dist
  117.     ).cache()
  118.  
  119. //     val diameter = 2 * r
  120. //
  121. //     val res = result.withColumn("second_a", pow(sin(toRadians($"A.dropoff_latitude" - $"B.pickup_latitude") / 2), 2)
  122. //       + cos(toRadians($"A.dropoff_latitude")) * cos(toRadians($"B.pickup_latitude")) * pow(sin(toRadians($"B.pickup_longitude" - $"A.dropoff_longitude") / 2), 2))
  123. //       .withColumn("second_dist",  (atan2(sqrt($"second_a"), sqrt(-$"second_a" + 1)) * diameter))
  124. //       .withColumn("first_a",  pow(sin(toRadians($"B.dropoff_latitude" - $"A.pickup_latitude") / 2), 2)
  125. //         + cos(toRadians($"B.dropoff_latitude")) * cos(toRadians($"A.pickup_latitude"))
  126. //         * pow(sin(toRadians($"A.pickup_longitude" - $"B.dropoff_longitude") / 2), 2))
  127. //       .withColumn("first_dist",  (atan2(sqrt($"first_a"), sqrt(-$"first_a" + 1)) * diameter))
  128.  
  129.  
  130. //
  131. //     val res = result.select((pow(sin(toRadians($"A.dropoff_latitude" - $"B.pickup_latitude") / 2), 2)
  132. //       + cos(toRadians($"A.dropoff_latitude")) * cos(toRadians($"B.pickup_latitude")) * pow(sin(toRadians($"B.pickup_longitude" - $"A.dropoff_longitude") / 2), 2)).as("a"),
  133. //       (atan2(sqrt($"a"), sqrt(-$"a" + 1)) * diameter).as("first_dist"),
  134. //       ( pow(sin(toRadians($"B.dropoff_latitude" - $"A.pickup_latitude") / 2), 2)
  135. //         + cos(toRadians($"B.dropoff_latitude")) * cos(toRadians($"A.pickup_latitude"))
  136. //         * pow(sin(toRadians($"A.pickup_longitude" - $"B.dropoff_longitude") / 2), 2)).as("b"),  (atan2(sqrt($"b"), sqrt(-$"b" + 1)) * diameter)
  137. //        )
  138.  
  139. //     val final_res = res.filter($"second_dist" < km_dist && $"first_dist" < km_dist).cache()
  140.         result
  141.  
  142.   }
  143. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement