Advertisement
Guest User

Untitled

a guest
Oct 14th, 2019
80
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.92 KB | None | 0 0
  1. // *** Sample Data
  2.  
  3. bash-3.2$ cat people_flat.json
  4. {"name":"Michael", "grade": 4.0}
  5. {"name":"Andy", "age":30, "grade": 3.5}
  6. {"name":"Justin", "age":19}
  7.  
  8. bash-3.2$ cat people_reordered.json
  9. {"age":65, "name":"Biswa", "grade": 4.0}
  10.  
  11. bash-3.2$ cat people_added_fields.json
  12. {"age":5, "name":"Uhuru", "grade": 4.5, "location": { "lat": 101.5, "lon": 68.0 } }
  13.  
  14. bash-3.2$ cat people_nested_fields_reordered.json
  15. {"age":25, "name":"Jolie", "grade": 3.2, "location": { "lon": 42.0, "lat": 189.3} }
  16.  
  17.  
  18.  
  19.  
  20.  
  21.  
  22. // Test Schema Evolution in Iceberg
  23.  
  24. import org.apache.spark.sql.types._ ;
  25. import org.apache.iceberg.hadoop.HadoopTables;
  26. import org.apache.iceberg.Schema;
  27. import org.apache.iceberg.spark.SparkSchemaUtil;
  28.  
  29.  
  30. // Create a dataset with data
  31. val sparkSchema = new StructType().add("name", StringType).add("age", IntegerType).add("grade", DoubleType)
  32. val jsonDf = spark.read.schema(sparkSchema).json("people_flat.json")
  33.  
  34.  
  35. val tables = new HadoopTables()
  36. val iceSchema = SparkSchemaUtil.convert(sparkSchema)
  37. val iceTable = tables.create(iceSchema, "./iceberg-schema-evolution-test")
  38.  
  39. jsonDf.write.format("iceberg").mode("append").save("./iceberg-schema-evolution-test")
  40.  
  41. spark.read.format("iceberg").load("./iceberg-schema-evolution-test").show()
  42.  
  43.  
  44.  
  45.  
  46. // *** Add new data with re-ordered top level fields using original schema
  47. val originalTable = tables.load("./iceberg-schema-evolution-test")
  48. val originalSchema = originalTable.schema
  49. val originalSparkSchema = SparkSchemaUtil.convert(originalTable.schema)
  50.  
  51. val jsonDfReordered = spark.read.schema(originalSparkSchema).json("people_reordered.json")
  52.  
  53. jsonDfReordered.write.format("iceberg").mode("append").save("./iceberg-schema-evolution-test")
  54.  
  55. spark.read.format("iceberg").load("./iceberg-schema-evolution-test").show()
  56.  
  57.  
  58.  
  59.  
  60. // *** Add new data with added column by transforming original schema and using that to write
  61. val originalTable = tables.load("./iceberg-schema-evolution-test")
  62. val locationType = new StructType().add("lat", DoubleType).add("lon", DoubleType)
  63.  
  64. originalTable.updateSchema().addColumn("location", SparkSchemaUtil.convert(locationType).asStruct).commit()
  65. originalTable.refresh()
  66.  
  67. val sparkSchemaAddedField = SparkSchemaUtil.convert(originalTable.schema)
  68.  
  69. val jsonDfAddedFields = spark.read.schema(sparkSchemaAddedField).json("people_added_fields.json")
  70. jsonDfAddedFields.write.format("iceberg").mode("append").save("./iceberg-schema-evolution-test")
  71.  
  72. spark.read.format("iceberg").load("./iceberg-schema-evolution-test").show()
  73.  
  74.  
  75.  
  76. // *** Add new data whose nested column struct has fields re-ordered
  77. val originalTable = tables.load("./iceberg-schema-evolution-test")
  78. val originalSparkSchema = SparkSchemaUtil.convert(originalTable.schema)
  79.  
  80. val jsonDfNestedReordered = spark.read.schema(originalSparkSchema).json("people_nested_fields_reordered.json")
  81.  
  82. jsonDfNestedReordered.write.format("iceberg").mode("append").save("./iceberg-schema-evolution-test")
  83.  
  84. spark.read.format("iceberg").load("./iceberg-schema-evolution-test").show()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement