Guest User

Untitled

a guest
Feb 17th, 2019
1,608
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.77 KB | None | 0 0
  1. package com.box.dataplatform
  2.  
  3. import com.netflix.iceberg.PartitionSpec
  4. import com.netflix.iceberg.expressions.Expressions
  5. import com.netflix.iceberg.hadoop.HadoopTables
  6. import com.netflix.iceberg.spark.SparkSchemaUtil
  7. import org.apache.spark.sql.SparkSession
  8. import org.joda.time.DateTime
  9. import org.slf4j.LoggerFactory
  10. import collection.JavaConverters._
  11.  
  12. case class TestData(id: String, value: String, key: Int, value1: String, value2: String)
  13.  
  14.  
  15. object SchemaEvolution extends App {
  16.  
  17. val location = s"/tmp/test_${DateTime.now().getMillis}"
  18. private val log = LoggerFactory.getLogger(SchemaEvolution.getClass.getName)
  19.  
  20. def getSample(watermark: Long, label: String, partitions: Int, size: Int) = {
  21. val wLabel = label + watermark
  22. Stream.from(size)
  23. .map(x =>
  24. TestData(wLabel + x, wLabel, x % partitions, wLabel + "1", wLabel + "2")
  25. ).take(size)
  26. }
  27.  
  28. // populate test app
  29. def initializeTestData(location: String) = {
  30. val spark = SparkSession
  31. .builder
  32. .appName("test app")
  33. .master("local[4]")
  34. .getOrCreate
  35. import spark.implicits._
  36. val CONF = spark.sparkContext.hadoopConfiguration
  37. val tables = new HadoopTables(CONF)
  38.  
  39. val testData = getSample(DateTime.now().getMillis, "test", 10, 1000)
  40. // create dataframe from test data
  41. val df = testData.toSeq.toDF("id", "value", "key", "value1", "value2").repartition(100)
  42.  
  43. // create iceberg schema for df
  44. val icebergSchema = SparkSchemaUtil.convert(df.schema)
  45. // build partition spec of iceberg table
  46. val spec = PartitionSpec.builderFor(icebergSchema).identity("key").build
  47. // create new iceberg table using specs & location
  48. val table = tables.create(icebergSchema, spec, location.toString)
  49.  
  50.  
  51. df.orderBy(df("key")).write
  52. .partitionBy("key")
  53. .format("iceberg")
  54. .mode("append")
  55. .save(location.toString)
  56.  
  57. table.refresh
  58.  
  59. }
  60.  
  61. def schemaEvolution = {
  62. // populate data
  63. initializeTestData(location)
  64.  
  65. val spark = SparkSession
  66. .builder
  67. .appName(this.getClass.getSimpleName)
  68. .master("local[4]")
  69. .getOrCreate
  70. import spark.implicits._
  71. val CONF = spark.sparkContext.hadoopConfiguration
  72. val tables = new HadoopTables(CONF)
  73.  
  74. val table = tables.load(location)
  75.  
  76. //verify we can load table
  77. var result = spark.read.format("iceberg").load(location.toString)
  78. result.printSchema()
  79. val count = spark.read.format("iceberg").load(location.toString).where("key = 0").count()
  80. log.info(s"count for key=0 : ${count}")
  81.  
  82.  
  83. // perform update operation
  84. table.updateSchema()
  85. .renameColumn("value1", "v1").commit()
  86. table.refresh()
  87. result = spark.read.format("iceberg").load(location.toString)
  88. result.printSchema()
  89.  
  90.  
  91. try {
  92. // try scan operation
  93. table.refresh()
  94. val exp1 = Expressions.equal("v1", "0")
  95. val scan = table.newScan().filter(exp1)
  96.  
  97. val f = scan.planFiles.asScala.toList.length
  98. val t = scan.planTasks.asScala.toList.length
  99. println("files to scan: " + f)
  100. println("tasks: " + t)
  101. } catch {
  102. case e: Exception => log.error(s"plan operation failed ${e.getMessage}", e)
  103. }
  104.  
  105. table.refresh()
  106. var df = spark.read.format("iceberg").load(location.toString)
  107. // using v1 will cause exception
  108. //result = df.where("v1 = 0")
  109. try {
  110. result = df.where("v1 = 0")
  111. log.info(s"final count is ${result.count()}")
  112. } catch {
  113. case e: Exception => log.error(s"operation failed ${e.getMessage}", e)
  114. }
  115.  
  116. // using value1 will cause exception
  117.  
  118. try {
  119. result = df.where("value1 = 0")
  120. log.info(s"final count is ${result.count()}")
  121. } catch {
  122. case e: Exception => log.error(s"operation failed ${e.getMessage}", e)
  123. }
  124. // using anyother field works
  125. //result = df.where("key = 0")
  126. try {
  127. result = df.where("key = 0")
  128. log.info(s"final count is ${result.count()}")
  129. } catch {
  130. case e: Exception => log.error(s"operation failed ${e.getMessage}", e)
  131. }
  132.  
  133. }
  134.  
  135.  
  136. schemaEvolution
  137. }
Add Comment
Please, Sign In to add comment