Guest User

Untitled

a guest
Dec 18th, 2018
61
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.43 KB | None | 0 0
  1. val spark = SparkSession.builder.master("local").appName("my-spark-app").getOrCreate()
  2. val df = spark.read.option("header",true).option("inferSchema",true).csv("C:\gg.csv").cache()
  3.  
  4. 12,13,14
  5. 11,10,5
  6. 3,2,45
  7.  
  8. define,col1,col2,col3
  9. c1,12,13,14
  10. c2,11,10,5
  11. c3,3,2,45
  12.  
  13. df.withColumn("columnName", column) //here "columnName" should be "define" for you
  14.  
  15. import org.apache.spark.sql.SparkSession
  16. import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
  17. import org.apache.spark.sql.Row
  18.  
  19.  
  20. //First off the dataframe needs to be loaded with the expected schema
  21.  
  22. val spark = SparkSession.builder().appName().getOrCreate()
  23.  
  24.  
  25. val schema = new StructType()
  26. .add("col1",IntegerType,true)
  27. .add("col2",IntegerType,true)
  28. .add("col3",IntegerType,true)
  29.  
  30. val df = spark.read.format("csv").schema(schema).load("C:\gg.csv").cache()
  31.  
  32. val rddWithId = df.rdd.zipWithIndex
  33.  
  34. // Prepend "define" column of type Long
  35. val newSchema = StructType(Array(StructField("define", StringType, false)) ++ df.schema.fields)
  36.  
  37. val dfZippedWithId = spark.createDataFrame(rddWithId.map{
  38. case (row, index) =>
  39. Row.fromSeq(Array("c" + index) ++ row.toSeq)}, newSchema)
  40. // Show results
  41. dfZippedWithId.show
  42.  
  43. +------+----+----+----+
  44. |define|col1|col2|col3|
  45. +------+----+----+----+
  46. | c0| 12| 13| 14|
  47. | c1| 11| 10| 5|
  48. | c2| 3| 2| 45|
  49. +------+----+----+----+
Add Comment
Please, Sign In to add comment