Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package osmesa.analytics.oneoffs
- import osmesa.analytics._
- import cats.implicits._
- import com.monovore.decline._
- import geotrellis.vector._
- import geotrellis.vector.io._
- import geotrellis.vector.io.json._
- import geotrellis.spark.util.KryoWrapper
- import org.apache.spark.sql._
- import org.apache.spark.sql.functions._
- import spray.json._
- import java.sql.Timestamp
- import java.time._
- import java.time.format._
- import scala.util._
- object ExportSnapshotCommand extends CommandApp(
- name = "export-snapshot",
- header = "Exports a snapshot of OSM based on history and changeset ORCs, a bounding box, and a point in time.",
- main = {
- val historyO = Opts.option[String]("history", help = "URI to the history ORC file to process.")
- val boundaryO = Opts.option[String]("boundary", help = "URI to geojson of boundary")
- val timeO = Opts.option[String]("time", help = "ISO 8601 format of snapshot date.")
- val snapshotPathO = Opts.option[String]("snapshotPath", help = "URI for output snapshot ORC")
- (
- historyO,
- boundaryO,
- timeO,
- snapshotPathO
- ).mapN { (historyUri, boundaryUri, timeStr, snapshotUri) =>
- Try(LocalDateTime.parse(timeStr)) match {
- case Success(time) =>
- ExportSnapshot.run(historyUri, boundaryUri, time, snapshotUri)
- case Failure(e) =>
- e match {
- case _: DateTimeParseException =>
- println(s"Could not parse date string '${timeStr}'. Make sure it's in ISO 8601 format. Parse error: $e")
- sys.exit(1)
- case _ =>
- throw e
- }
- }
- }
- }
- )
- object ExportSnapshot {
- def readFile(path: String): String = {
- val src = scala.io.Source.fromFile(path)
- try {
- src.mkString
- } finally {
- src.close
- }
- }
- def run(
- historyUri: String,
- boundaryUri: String,
- time: LocalDateTime,
- snapshotUri: String
- ): Unit = {
- val mp = {
- // Allow the GeoJSON to be a Polygon, MultiPolygon, or FeatureCollection with Polygons or MultiPolygons
- val polygons =
- Try(readFile(boundaryUri).parseGeoJson[Polygon]).map(List(_)).getOrElse(List[Polygon]()) :::
- Try(readFile(boundaryUri).parseGeoJson[MultiPolygon]).map(_.polygons.toList).getOrElse(List[Polygon]()) :::
- Try(readFile(boundaryUri).parseGeoJson[JsonFeatureCollection].getAll[MultiPolygon]).map { collection =>
- collection.getAll[Polygon].toList :::
- collection.getAll[MultiPolygon].flatMap(_.polygons).toList
- }.getOrElse(List[Polygon]())
- MultiPolygon(polygons)
- }
- implicit val spark = Analytics.sparkSession("StatsJob")
- try {
- val history = spark.read.orc(historyUri)
- val df = createSnapshotDataFrame(history, mp, time)
- df.write.format("orc").save(snapshotUri)
- } finally {
- spark.stop()
- }
- }
- def createSnapshotDataFrame(
- history: DataFrame,
- boundary: MultiPolygon,
- time: LocalDateTime
- )(implicit spark: SparkSession): DataFrame = {
- import spark.implicits._
- val preparedGeom = KryoWrapper(boundary.prepare)
- val boundingBox = boundary.envelope
- val isInBoundary =
- udf { (lat: Double, lon: Double) =>
- if(boundingBox.contains(lon, lat)) {
- preparedGeom.value.contains(Point(lon, lat))
- } else { false }
- }
- val ts = Timestamp.valueOf(time)
- val timeFiltered =
- history.
- where($"timestamp" <= ts)
- val nodeIds =
- timeFiltered.
- where($"type" === "node").
- where(isInBoundary($"lat", $"lon")).
- select($"id")
- val wayIds =
- timeFiltered.
- where($"type" === "way").
- select($"id", explode($"nds").as("nodeId")).
- join(nodeIds.select($"id".as("nodeId")), "nodeId").
- drop($"nodeId")
- // We want to create a DF that has the relation ID,
- // the type of related element that is a way or node,
- // and the target ID.
- // OSM can have relations of relations, so we need to flatten any
- // relations out in a loop.
- val relationsToTargets =
- timeFiltered.
- where($"type" === "relation").
- select($"id", explode($"members").as("member")).
- select($"id", $"member.type".as("targetType"), $"member.id".as("targetId"))
- var flattenedRelationTargets =
- relationsToTargets.
- where($"targetType" =!= "relation")
- var relationsOfRelations =
- relationsToTargets.
- where($"targetType" === "relation").
- select($"id", $"targetId".as("relationId"))
- while(relationsOfRelations.count() > 0) {
- val df =
- relationsToTargets.select($"id".as("joinId"), $"targetId", $"targetType")
- val flattened =
- relationsOfRelations.
- join(relationsToTargets, relationsOfRelations.col("relation") === df.col("joinId")).
- drop("relationid")
- flattenedRelationTargets =
- flattened.where($"targetType" =!= "relation").union(flattenedRelationTargets)
- relationsOfRelations =
- flattened.
- where($"targetType" === "relation").
- select($"id", $"targetId".as("relationId"))
- }
- val nodeRelationIds =
- flattenedRelationTargets.
- where($"targetType" === "node").
- join(nodeIds.select($"id").as("targetId"), "targetId").
- select($"id")
- val wayRelationIds =
- flattenedRelationTargets.
- where($"targetType" === "way").
- join(nodeIds.select($"id").as("targetId"), "targetId").
- select($"id")
- val desiredIds =
- nodeIds.
- union(wayIds).
- union(nodeRelationIds).
- union(wayRelationIds).
- select($"id".as("joinId")).
- distinct
- val filtered =
- timeFiltered.
- join(desiredIds, timeFiltered.col("id") === desiredIds.col("joinId")).
- drop("joinId")
- // Now need to set the visiblilty correctly
- // We need to refer back to the old history to make sure we don't
- // include anything that was deleted permenantly, not because of a new version.
- ???
- }
- }
Add Comment
Please, Sign In to add comment