Guest User

Untitled

a guest
Feb 21st, 2018
91
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.89 KB | None | 0 0
  1. package osmesa.analytics.oneoffs
  2.  
  3. import osmesa.analytics._
  4.  
  5. import cats.implicits._
  6. import com.monovore.decline._
  7. import geotrellis.vector._
  8. import geotrellis.vector.io._
  9. import geotrellis.vector.io.json._
  10. import geotrellis.spark.util.KryoWrapper
  11. import org.apache.spark.sql._
  12. import org.apache.spark.sql.functions._
  13. import spray.json._
  14.  
  15. import java.sql.Timestamp
  16. import java.time._
  17. import java.time.format._
  18. import scala.util._
  19.  
  20. object ExportSnapshotCommand extends CommandApp(
  21.  
  22. name = "export-snapshot",
  23. header = "Exports a snapshot of OSM based on history and changeset ORCs, a bounding box, and a point in time.",
  24. main = {
  25.  
  26. val historyO = Opts.option[String]("history", help = "URI to the history ORC file to process.")
  27. val boundaryO = Opts.option[String]("boundary", help = "URI to geojson of boundary")
  28. val timeO = Opts.option[String]("time", help = "ISO 8601 format of snapshot date.")
  29. val snapshotPathO = Opts.option[String]("snapshotPath", help = "URI for output snapshot ORC")
  30.  
  31. (
  32. historyO,
  33. boundaryO,
  34. timeO,
  35. snapshotPathO
  36. ).mapN { (historyUri, boundaryUri, timeStr, snapshotUri) =>
  37. Try(LocalDateTime.parse(timeStr)) match {
  38. case Success(time) =>
  39. ExportSnapshot.run(historyUri, boundaryUri, time, snapshotUri)
  40. case Failure(e) =>
  41. e match {
  42. case _: DateTimeParseException =>
  43. println(s"Could not parse date string '${timeStr}'. Make sure it's in ISO 8601 format. Parse error: $e")
  44. sys.exit(1)
  45. case _ =>
  46. throw e
  47. }
  48. }
  49. }
  50. }
  51. )
  52.  
  53. object ExportSnapshot {
  54. def readFile(path: String): String = {
  55. val src = scala.io.Source.fromFile(path)
  56. try {
  57. src.mkString
  58. } finally {
  59. src.close
  60. }
  61. }
  62.  
  63. def run(
  64. historyUri: String,
  65. boundaryUri: String,
  66. time: LocalDateTime,
  67. snapshotUri: String
  68. ): Unit = {
  69.  
  70. val mp = {
  71. // Allow the GeoJSON to be a Polygon, MultiPolygon, or FeatureCollection with Polygons or MultiPolygons
  72. val polygons =
  73. Try(readFile(boundaryUri).parseGeoJson[Polygon]).map(List(_)).getOrElse(List[Polygon]()) :::
  74. Try(readFile(boundaryUri).parseGeoJson[MultiPolygon]).map(_.polygons.toList).getOrElse(List[Polygon]()) :::
  75. Try(readFile(boundaryUri).parseGeoJson[JsonFeatureCollection].getAll[MultiPolygon]).map { collection =>
  76. collection.getAll[Polygon].toList :::
  77. collection.getAll[MultiPolygon].flatMap(_.polygons).toList
  78. }.getOrElse(List[Polygon]())
  79.  
  80. MultiPolygon(polygons)
  81. }
  82.  
  83. implicit val spark = Analytics.sparkSession("StatsJob")
  84.  
  85. try {
  86. val history = spark.read.orc(historyUri)
  87. val df = createSnapshotDataFrame(history, mp, time)
  88. df.write.format("orc").save(snapshotUri)
  89. } finally {
  90. spark.stop()
  91. }
  92. }
  93.  
  94. def createSnapshotDataFrame(
  95. history: DataFrame,
  96. boundary: MultiPolygon,
  97. time: LocalDateTime
  98. )(implicit spark: SparkSession): DataFrame = {
  99. import spark.implicits._
  100.  
  101. val preparedGeom = KryoWrapper(boundary.prepare)
  102. val boundingBox = boundary.envelope
  103. val isInBoundary =
  104. udf { (lat: Double, lon: Double) =>
  105. if(boundingBox.contains(lon, lat)) {
  106. preparedGeom.value.contains(Point(lon, lat))
  107. } else { false }
  108. }
  109.  
  110. val ts = Timestamp.valueOf(time)
  111. val timeFiltered =
  112. history.
  113. where($"timestamp" <= ts)
  114.  
  115. val nodeIds =
  116. timeFiltered.
  117. where($"type" === "node").
  118. where(isInBoundary($"lat", $"lon")).
  119. select($"id")
  120.  
  121. val wayIds =
  122. timeFiltered.
  123. where($"type" === "way").
  124. select($"id", explode($"nds").as("nodeId")).
  125. join(nodeIds.select($"id".as("nodeId")), "nodeId").
  126. drop($"nodeId")
  127.  
  128. // We want to create a DF that has the relation ID,
  129. // the type of related element that is a way or node,
  130. // and the target ID.
  131. // OSM can have relations of relations, so we need to flatten any
  132. // relations out in a loop.
  133.  
  134. val relationsToTargets =
  135. timeFiltered.
  136. where($"type" === "relation").
  137. select($"id", explode($"members").as("member")).
  138. select($"id", $"member.type".as("targetType"), $"member.id".as("targetId"))
  139.  
  140. var flattenedRelationTargets =
  141. relationsToTargets.
  142. where($"targetType" =!= "relation")
  143.  
  144. var relationsOfRelations =
  145. relationsToTargets.
  146. where($"targetType" === "relation").
  147. select($"id", $"targetId".as("relationId"))
  148.  
  149. while(relationsOfRelations.count() > 0) {
  150. val df =
  151. relationsToTargets.select($"id".as("joinId"), $"targetId", $"targetType")
  152.  
  153. val flattened =
  154. relationsOfRelations.
  155. join(relationsToTargets, relationsOfRelations.col("relation") === df.col("joinId")).
  156. drop("relationid")
  157.  
  158. flattenedRelationTargets =
  159. flattened.where($"targetType" =!= "relation").union(flattenedRelationTargets)
  160.  
  161. relationsOfRelations =
  162. flattened.
  163. where($"targetType" === "relation").
  164. select($"id", $"targetId".as("relationId"))
  165. }
  166.  
  167. val nodeRelationIds =
  168. flattenedRelationTargets.
  169. where($"targetType" === "node").
  170. join(nodeIds.select($"id").as("targetId"), "targetId").
  171. select($"id")
  172.  
  173. val wayRelationIds =
  174. flattenedRelationTargets.
  175. where($"targetType" === "way").
  176. join(nodeIds.select($"id").as("targetId"), "targetId").
  177. select($"id")
  178.  
  179. val desiredIds =
  180. nodeIds.
  181. union(wayIds).
  182. union(nodeRelationIds).
  183. union(wayRelationIds).
  184. select($"id".as("joinId")).
  185. distinct
  186.  
  187. val filtered =
  188. timeFiltered.
  189. join(desiredIds, timeFiltered.col("id") === desiredIds.col("joinId")).
  190. drop("joinId")
  191.  
  192. // Now need to set the visiblilty correctly
  193. // We need to refer back to the old history to make sure we don't
  194. // include anything that was deleted permenantly, not because of a new version.
  195.  
  196. ???
  197. }
  198. }
Add Comment
Please, Sign In to add comment