Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // *** Sample Data
- bash-3.2$ cat people_flat.json
- {"name":"Michael", "grade": 4.0}
- {"name":"Andy", "age":30, "grade": 3.5}
- {"name":"Justin", "age":19}
- bash-3.2$ cat people_reordered.json
- {"age":65, "name":"Biswa", "grade": 4.0}
- bash-3.2$ cat people_added_fields.json
- {"age":5, "name":"Uhuru", "grade": 4.5, "location": { "lat": 101.5, "lon": 68.0 } }
- bash-3.2$ cat people_nested_fields_reordered.json
- {"age":25, "name":"Jolie", "grade": 3.2, "location": { "lon": 42.0, "lat": 189.3} }
- // Test Schema Evolution in Iceberg
- import org.apache.spark.sql.types._ ;
- import org.apache.iceberg.hadoop.HadoopTables;
- import org.apache.iceberg.Schema;
- import org.apache.iceberg.spark.SparkSchemaUtil;
- // Create a dataset with data
- val sparkSchema = new StructType().add("name", StringType).add("age", IntegerType).add("grade", DoubleType)
- val jsonDf = spark.read.schema(sparkSchema).json("people_flat.json")
- val tables = new HadoopTables()
- val iceSchema = SparkSchemaUtil.convert(sparkSchema)
- val iceTable = tables.create(iceSchema, "./iceberg-schema-evolution-test")
- jsonDf.write.format("iceberg").mode("append").save("./iceberg-schema-evolution-test")
- spark.read.format("iceberg").load("./iceberg-schema-evolution-test").show()
- // *** Add new data with re-ordered top level fields using original schema
- val originalTable = tables.load("./iceberg-schema-evolution-test")
- val originalSchema = originalTable.schema
- val originalSparkSchema = SparkSchemaUtil.convert(originalTable.schema)
- val jsonDfReordered = spark.read.schema(originalSparkSchema).json("people_reordered.json")
- jsonDfReordered.write.format("iceberg").mode("append").save("./iceberg-schema-evolution-test")
- spark.read.format("iceberg").load("./iceberg-schema-evolution-test").show()
- // *** Add new data with added column by transforming original schema and using that to write
- val originalTable = tables.load("./iceberg-schema-evolution-test")
- val locationType = new StructType().add("lat", DoubleType).add("lon", DoubleType)
- originalTable.updateSchema().addColumn("location", SparkSchemaUtil.convert(locationType).asStruct).commit()
- originalTable.refresh()
- val sparkSchemaAddedField = SparkSchemaUtil.convert(originalTable.schema)
- val jsonDfAddedFields = spark.read.schema(sparkSchemaAddedField).json("people_added_fields.json")
- jsonDfAddedFields.write.format("iceberg").mode("append").save("./iceberg-schema-evolution-test")
- spark.read.format("iceberg").load("./iceberg-schema-evolution-test").show()
- // *** Add new data whose nested column struct has fields re-ordered
- val originalTable = tables.load("./iceberg-schema-evolution-test")
- val originalSparkSchema = SparkSchemaUtil.convert(originalTable.schema)
- val jsonDfNestedReordered = spark.read.schema(originalSparkSchema).json("people_nested_fields_reordered.json")
- jsonDfNestedReordered.write.format("iceberg").mode("append").save("./iceberg-schema-evolution-test")
- spark.read.format("iceberg").load("./iceberg-schema-evolution-test").show()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement