Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.box.dataplatform
- import com.netflix.iceberg.PartitionSpec
- import com.netflix.iceberg.expressions.Expressions
- import com.netflix.iceberg.hadoop.HadoopTables
- import com.netflix.iceberg.spark.SparkSchemaUtil
- import org.apache.spark.sql.SparkSession
- import org.joda.time.DateTime
- import org.slf4j.LoggerFactory
- import collection.JavaConverters._
- case class TestData(id: String, value: String, key: Int, value1: String, value2: String)
- object SchemaEvolution extends App {
- val location = s"/tmp/test_${DateTime.now().getMillis}"
- private val log = LoggerFactory.getLogger(SchemaEvolution.getClass.getName)
- def getSample(watermark: Long, label: String, partitions: Int, size: Int) = {
- val wLabel = label + watermark
- Stream.from(size)
- .map(x =>
- TestData(wLabel + x, wLabel, x % partitions, wLabel + "1", wLabel + "2")
- ).take(size)
- }
- // populate test app
- def initializeTestData(location: String) = {
- val spark = SparkSession
- .builder
- .appName("test app")
- .master("local[4]")
- .getOrCreate
- import spark.implicits._
- val CONF = spark.sparkContext.hadoopConfiguration
- val tables = new HadoopTables(CONF)
- val testData = getSample(DateTime.now().getMillis, "test", 10, 1000)
- // create dataframe from test data
- val df = testData.toSeq.toDF("id", "value", "key", "value1", "value2").repartition(100)
- // create iceberg schema for df
- val icebergSchema = SparkSchemaUtil.convert(df.schema)
- // build partition spec of iceberg table
- val spec = PartitionSpec.builderFor(icebergSchema).identity("key").build
- // create new iceberg table using specs & location
- val table = tables.create(icebergSchema, spec, location.toString)
- df.orderBy(df("key")).write
- .partitionBy("key")
- .format("iceberg")
- .mode("append")
- .save(location.toString)
- table.refresh
- }
- def schemaEvolution = {
- // populate data
- initializeTestData(location)
- val spark = SparkSession
- .builder
- .appName(this.getClass.getSimpleName)
- .master("local[4]")
- .getOrCreate
- import spark.implicits._
- val CONF = spark.sparkContext.hadoopConfiguration
- val tables = new HadoopTables(CONF)
- val table = tables.load(location)
- //verify we can load table
- var result = spark.read.format("iceberg").load(location.toString)
- result.printSchema()
- val count = spark.read.format("iceberg").load(location.toString).where("key = 0").count()
- log.info(s"count for key=0 : ${count}")
- // perform update operation
- table.updateSchema()
- .renameColumn("value1", "v1").commit()
- table.refresh()
- result = spark.read.format("iceberg").load(location.toString)
- result.printSchema()
- try {
- // try scan operation
- table.refresh()
- val exp1 = Expressions.equal("v1", "0")
- val scan = table.newScan().filter(exp1)
- val f = scan.planFiles.asScala.toList.length
- val t = scan.planTasks.asScala.toList.length
- println("files to scan: " + f)
- println("tasks: " + t)
- } catch {
- case e: Exception => log.error(s"plan operation failed ${e.getMessage}", e)
- }
- table.refresh()
- var df = spark.read.format("iceberg").load(location.toString)
- // using v1 will cause exception
- //result = df.where("v1 = 0")
- try {
- result = df.where("v1 = 0")
- log.info(s"final count is ${result.count()}")
- } catch {
- case e: Exception => log.error(s"operation failed ${e.getMessage}", e)
- }
- // using value1 will cause exception
- try {
- result = df.where("value1 = 0")
- log.info(s"final count is ${result.count()}")
- } catch {
- case e: Exception => log.error(s"operation failed ${e.getMessage}", e)
- }
- // using anyother field works
- //result = df.where("key = 0")
- try {
- result = df.where("key = 0")
- log.info(s"final count is ${result.count()}")
- } catch {
- case e: Exception => log.error(s"operation failed ${e.getMessage}", e)
- }
- }
- schemaEvolution
- }
Add Comment
Please, Sign In to add comment