Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- //Scala/FirstDataFrame.scala
- import org.apache.spark.sql.SparkSession
- //Creates an RDD of tuples with some data
- // create an RDD of tuples with some data
- val custs = Seq(
- (1, "Widget Co", 120000.00, 0.00, "AZ"),
- (2, "Acme Widgets", 410500.00, 500.00, "CA"),
- (3, "Widgetry", 410500.00, 200.00, "CA"),
- (4, "Widgets R Us", 410500.00, 0.0, "CA"),
- (5, "Ye Olde Widgete", 500.00, 0.0, "MA")
- )
- //Parallelize data for faster processing
- val customerRows = spark.sparkContext.parallelize(custs, 4)
- //Convert RDD of tuples to DataFrame by supplying column names
- val customerDF = customerRows.toDF("id", "name", "sales", "discount", "state")
- //There are many ways to see the schema of your DataFrame, this is what you should NOT do!
- println("*** toString() just gives you the schema")
- println(customerDF.toString())
- println("*** show() gives you neatly formatted data")
- customerDF.show()
- println("*** use select() to choose one column")
- customerDF.select("id").show()
- println("*** use select() for multiple columns")
- customerDF.select("sales", "state").show()
- customerDF.select("id").show()
- //For example, filter API is very similar to WHERE clause in SQL
- println("*** use filter() to choose rows")
- customerDF.filter($"state".equalTo("CA")).show()
- //Scala/FirstDataset.scala
- //This tutorial create a Spark dataset from a scala Seq object (Shift-Enter to run)
- //Convert a Seq scala object to Spark Dataset
- val s = Seq(10, 11, 12, 13, 14, 15)
- val ds = s.toDS()
- // only one column, and it always has the same name
- println("*** only one column, and it always has the same name")
- ds.columns.foreach(println(_))
- //println("*** only one column, and it always has the same name")
- ds.columns.foreach(println(_))
- //println("*** column types")
- ds.dtypes.foreach(println(_))
- //println("*** schema as if it was a DataFrame")
- ds.printSchema()
- println("*** values > 12")
- ds.where($"value" > 12).show()
- // This seems to be the best way to get a range that's actually a Seq and
- // thus easy to convert to a Dataset, rather than a Range, which isn't.
- val s2 = Seq.range(1, 100)
- println("*** size of the range")
- println(s2.size)
- //create Dataset from RDD
- val tuples = Seq((1, "one", "un"), (2, "two", "deux"), (3, "three", "trois"))
- val tupleDS = tuples.toDS()
- println("*** Tuple Dataset types")
- tupleDS.dtypes.foreach(println(_))
- // the tuple columns have unfriendly names, but you can use them to query
- println("*** filter by one column and fetch another")
- tupleDS.where($"_1" > 2).select($"_2", $"_3").show()
- //Create Dataset from DataFrame
- //case class Company(name: String, foundingYear: Int, numEmployees: Int)
- //val inputSeq = Seq(Company("ABC", 1998, 310), Company("XYZ", 1983, 904), Company("NOP", 2005, 83))
- //val df = sc.parallelize(inputSeq).toDF()
- //val companyDS = df.as[Company]
- //companyDS.show()
- //FirstRDD.scala
- //This tutorial create a Spark RDD though you should just use DF
- // put some data in an RDD in 4 partitions
- val numbers = 1 to 10
- val numbersRDD = sc.parallelize(numbers, 4)
- println("Print each element of the original RDD")
- numbersRDD.foreach(println)
- // trivially operate on the numbers
- val stillAnRDD = numbersRDD.map(n => n.toDouble / 10)
- // get the data back out
- val nowAnArray = stillAnRDD.collect()
- // interesting how the array comes out sorted but the RDD didn't
- println("Now print each element of the transformed array")
- nowAnArray.foreach(println)
- // explore RDD properties
- //glom return an RDD created by coalescing all elements within each partition into an array.
- val partitions = stillAnRDD.glom()
- println("We _should_ have 4 partitions")
- println(partitions.count())
- partitions.foreach(a => {
- println("Partition contents:" +
- a.foldLeft("")((s, e) => s + " " + e))
- })
- //##### This tutorial explores the interoperability between DataFrame and Dataset. Note that Dataset is covered in much greater detail in the DataSet tutoria
- //DataSetConversion.scala
- case class Cust(id: Integer, name: String, sales: Double, discount: Double, state: String)
- case class StateSales(state: String, sales: Double)
- // create a sequence of case class objects
- // (we defined the case class above)
- val custs = Seq(
- Cust(1, "Widget Co", 120000.00, 0.00, "AZ"),
- Cust(2, "Acme Widgets", 410500.00, 500.00, "CA"),
- Cust(3, "Widgetry", 410500.00, 200.00, "CA"),
- Cust(4, "Widgets R Us", 410500.00, 0.0, "CA"),
- Cust(5, "Ye Olde Widgete", 500.00, 0.0, "MA")
- )
- // Create the DataFrame without passing through an RDD
- val customerDF = spark.createDataFrame(custs)
- println("*** DataFrame schema")
- customerDF.printSchema()
- println("*** DataFrame contents")
- customerDF.show()
- println("*** Select and filter the DataFrame")
- val smallerDF =
- customerDF.select("sales", "state").filter($"state".equalTo("CA"))
- smallerDF.show()
- // Convert it to a Dataset by specifying the type of the rows -- use a case
- // class because we have one and it's most convenient to work with. Notice
- // you have to choose a case class that matches the remaining columns.
- // BUT also notice that the columns keep their order from the DataFrame --
- // later you'll see a Dataset[StateSales] of the same type where the
- // columns have the opposite order, because of the way it was created.
- val customerDS = smallerDF.as[StateSales]
- println("*** Dataset schema")
- customerDS.printSchema()
- // Select and other operations can be performed directly on a Dataset too,
- // but be careful to read the documentation for Dataset -- there are
- // "typed transformations", which produce a Dataset, and
- // "untyped transformations", which produce a DataFrame. In particular,
- // you need to project using a TypedColumn to gate a Dataset.
- val verySmallDS = customerDS.select($"sales".as[Double])
- println("*** Dataset after projecting one column")
- verySmallDS.show()
- // If you select multiple columns on a Dataset you end up with a Dataset
- // of tuple type, but the columns keep their names.
- val tupleDS = customerDS.select($"state".as[String], $"sales".as[Double])
- // You can also cast back to a Dataset of a case class. Notice this time
- // the columns have the opposite order than the last Dataset[StateSales]
- val betterDS = tupleDS.as[StateSales]
- println("*** Dataset after projecting two columns -- case class version")
- betterDS.show()
- // Converting back to a DataFrame without making other changes is really easy
- val backToDataFrame = tupleDS.toDF()
- println("*** This time as a DataFrame")
- backToDataFrame.show()
- // While converting back to a DataFrame you can rename the columns
- val renamedDataFrame = tupleDS.toDF("MyState", "MySales")
- println("*** Again as a DataFrame but with renamed columns")
- renamedDataFrame.show()
- //TypeSafetyDatasets.scala
- case class Person(name : String , age : Int)
- val fpath = s"gs://bigdatax-spark-102/people.json"
- val dataframe = spark.read.json(fpath)
- //Column doesn't exists
- //Will throw error
- //Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`salary`' given input columns: [age, name]; line 1 pos 0;
- //'Filter ('salary > 10000)
- //+- Relation[age#8L,name#9] json
- dataframe.filter("salary > 10000").show
- //Datasets
- // case class Person(name : String , age : Int)
- val personRDD = spark.sparkContext.makeRDD(Seq(Person("A",10),Person("B",20)))
- val personDF = spark.createDataFrame(personRDD)
- val ds = personDF.as[Person]
- ds.filter(p => p.age > 25).show()
- ds.filter(p => p.name == "Andy").show()
- //Showing error at compile time itself as salary column is not in dataset
- ds.filter(p => p.salary > 25).show()
- // error : value salary is not a member of person// returns RDD[Person]
- //Read JSON Datasets
- val gfilepath = s"gs://bigdatax-spark-102/miserables.json"
- //SparkUDFs.scala
- val dataset = Seq((2, "hello"), (4, "world")).toDF("id", "text")
- val dataframe = dataset.toDF()
- dataframe.show()
- // Define a regular Scala function
- val upper: String => String = _.toUpperCase
- // Define a UDF that wraps the upper Scala function defined above
- // You could also define the function in place, i.e. inside udf
- // but separating Scala functions from Spark SQL's UDFs allows for easier testing
- import org.apache.spark.sql.functions.udf
- val upperUDF = udf(upper)
- val squared = (s: Int) => {
- s * s
- }
- val squareUDF = udf(squared)
- // Apply the UDF to change the source dataset
- dataset.withColumn("upper", upperUDF('text)).show
- dataframe.withColumn("upper", upperUDF('text)).show
- //// UDF for datasets
- dataset.withColumn("squaredvalue", squareUDF('id)).show
- // UDF for DataFrames
- dataframe.withColumn("squaredvalue", squareUDF('id)).show
- //QueryAnalyzer.scala
- val inventory = spark
- .range(5)
- .withColumn("new_column", 'id + 5 as "plus5")
- inventory.show()
- //Show physical and optimized plan
- inventory.explain(extended = true)
- //Alternate way
- val analyzedPlan = inventory.queryExecution.analyzed
- //ReadWriteFiles.scala
- val sourcePath=s"gs://meetup-bigdatax/sparkworkshop/resources/flightdata-json"
- val destCSVPath = s"gs://meetup-bigdatax/resources/sparkworkshop/resources/flighdata.csv"
- val destPartPath=s"gs://meetup-bigdatax/resources/sparkworkshop/resources/partitionedata"
- ////schema of the json file
- import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
- val myManualSchema = new StructType(Array(
- new StructField("DEST_COUNTRY_NAME", StringType, true),
- new StructField("ORIGIN_COUNTRY_NAME", StringType, true),
- new StructField("count", LongType, false) ))
- //lood the json file
- val jsonFile = spark.read.format("json").option("mode", "FAILFAST").schema(myManualSchema).load(sourcePath)
- //show the file data
- jsonFile.show(5)
- // //write the file data in csv format
- jsonFile.write.format("csv").mode("overwrite").option("sep", "\t")
- .save(destCSVPath)
- //write the data using partition
- jsonFile.limit(10).write.mode("overwrite").partitionBy("DEST_COUNTRY_NAME")
- .save(destPartPath)
- //Read data from sql
- //Load the data from SQL database
- val sqlDF = spark.read
- .format("jdbc")
- .option("driver", "org.postgresql.Driver")
- .option("url", "jdbc:postgresql://database_server")
- .option("dbtable", "schema.tablename")
- .option("user", "username").option("password","my-secret-password").load()
- sqlDF.select("DEST_COUNTRY_NAME").distinct().show(5)
- //ReadXMLFile
- val xmlFile = s"gs://meetup-bigdatax/sparkworkshop/resources/Item.xml"
- val innerSchema = StructType(
- StructField("ItemData",
- ArrayType(
- StructType(
- StructField("IdKey",LongType,true)::
- StructField("Value",LongType,true)::Nil
- )
- ),true)::Nil
- )
- val schema = StructType(
- StructField("CDate",StringType,true)::
- StructField("ListItemData", innerSchema, true):: Nil
- )
- import spark.sqlContext.implicits._
- val df = spark.sqlContext.read.format("com.databricks.spark.xml")
- .option("rowTag", "Item")
- .schema(schema)
- .load(xmlFile)
- //Selecy nested field and explode to get the flattern result
- .withColumn("ItemData", explode($"ListItemData.ItemData"))
- .select("CDate", "ItemData.*").show() // select required column
- //Spark Streaming
- import org.apache.spark.streaming.{ Seconds, StreamingContext }
- import org.apache.spark.SparkContext._
- import org.apache.spark.streaming.twitter._
- import org.apache.spark.SparkConf
- import org.apache.spark.streaming._
- import org.apache.spark.{ SparkContext, SparkConf }
- import org.apache.spark.storage.StorageLevel
- //import org.apache.spark.streaming.flume._
- /**
- * A Spark Streaming application that receives tweets on certain
- * keywords from twitter datasource and find the popular hashtags
- *
- * Arguments: <comsumerKey> <consumerSecret> <accessToken> <accessTokenSecret> <keyword_1> ... <keyword_n>
- * <comsumerKey> - Twitter consumer key
- * <consumerSecret> - Twitter consumer secret
- * <accessToken> - Twitter access token
- * <accessTokenSecret> - Twitter access token secret
- * <keyword_1> - The keyword to filter tweets
- * <keyword_n> - Any number of keywords to filter tweets
- *
- *
- *
- *
- */
- object SparkPopularHashTags {
- val conf = new SparkConf().setMaster("local[4]").setAppName("Spark Streaming - PopularHashTags")
- val sc = new SparkContext(conf)
- def main(args: Array[String]) {
- sc.setLogLevel("WARN")
- //either pass the arguments through console
- //val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
- //val filters = args.takeRight(args.length - 4)
- val filters = Array("#trending","#popular","#tech","#smartcities","#MachineLearning");
- // Set the system properties so that Twitter4j library used by twitter stream
- // can use them to generat OAuth credentials
- System.setProperty("twitter4j.oauth.consumerKey", <comsumerKey>)
- System.setProperty("twitter4j.oauth.consumerSecret", <consumerSecret> )
- System.setProperty("twitter4j.oauth.accessToken", <accessToken>)
- System.setProperty("twitter4j.oauth.accessTokenSecret", <accessTokenSecret>)
- // Set the Spark StreamingContext to create a DStream for every 5 seconds
- val ssc = new StreamingContext(sc, Seconds(5))
- // Pass the filter keywords as arguements
- // val stream = FlumeUtils.createStream(ssc, args(0), args(1).toInt)
- val stream = TwitterUtils.createStream(ssc, None, filters)
- // Split the stream on space and extract hashtags
- val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
- // Get the top hashtags over the previous 60 sec window
- val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
- .map { case (topic, count) => (count, topic) }
- .transform(_.sortByKey(false))
- // Get the top hashtags over the previous 10 sec window
- val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
- .map { case (topic, count) => (count, topic) }
- .transform(_.sortByKey(false))
- // print tweets in the currect DStream
- stream.print()
- // Print popular hashtags
- topCounts60.foreachRDD(rdd => {
- val topList = rdd.take(10)
- println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
- topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
- })
- topCounts10.foreachRDD(rdd => {
- val topList = rdd.take(10)
- println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
- topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
- })
- ssc.start()
- ssc.awaitTermination()
- }
- }
Add Comment
Please, Sign In to add comment