Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- val spark = SparkSession.builder.master("local").appName("my-spark-app").getOrCreate()
- val df = spark.read.option("header",true).option("inferSchema",true).csv("C:\gg.csv").cache()
- 12,13,14
- 11,10,5
- 3,2,45
- define,col1,col2,col3
- c1,12,13,14
- c2,11,10,5
- c3,3,2,45
- df.withColumn("columnName", column) //here "columnName" should be "define" for you
- import org.apache.spark.sql.SparkSession
- import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
- import org.apache.spark.sql.Row
- //First off the dataframe needs to be loaded with the expected schema
- val spark = SparkSession.builder().appName().getOrCreate()
- val schema = new StructType()
- .add("col1",IntegerType,true)
- .add("col2",IntegerType,true)
- .add("col3",IntegerType,true)
- val df = spark.read.format("csv").schema(schema).load("C:\gg.csv").cache()
- val rddWithId = df.rdd.zipWithIndex
- // Prepend "define" column of type Long
- val newSchema = StructType(Array(StructField("define", StringType, false)) ++ df.schema.fields)
- val dfZippedWithId = spark.createDataFrame(rddWithId.map{
- case (row, index) =>
- Row.fromSeq(Array("c" + index) ++ row.toSeq)}, newSchema)
- // Show results
- dfZippedWithId.show
- +------+----+----+----+
- |define|col1|col2|col3|
- +------+----+----+----+
- | c0| 12| 13| 14|
- | c1| 11| 10| 5|
- | c2| 3| 2| 45|
- +------+----+----+----+
Add Comment
Please, Sign In to add comment