Guest User

Untitled

a guest
Aug 29th, 2018
182
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 14.33 KB | None | 0 0
  1. //Scala/FirstDataFrame.scala
  2. import org.apache.spark.sql.SparkSession
  3. //Creates an RDD of tuples with some data
  4. // create an RDD of tuples with some data
  5. val custs = Seq(
  6. (1, "Widget Co", 120000.00, 0.00, "AZ"),
  7. (2, "Acme Widgets", 410500.00, 500.00, "CA"),
  8. (3, "Widgetry", 410500.00, 200.00, "CA"),
  9. (4, "Widgets R Us", 410500.00, 0.0, "CA"),
  10. (5, "Ye Olde Widgete", 500.00, 0.0, "MA")
  11. )
  12. //Parallelize data for faster processing
  13. val customerRows = spark.sparkContext.parallelize(custs, 4)
  14.  
  15. //Convert RDD of tuples to DataFrame by supplying column names
  16. val customerDF = customerRows.toDF("id", "name", "sales", "discount", "state")
  17.  
  18. //There are many ways to see the schema of your DataFrame, this is what you should NOT do!
  19.  
  20. println("*** toString() just gives you the schema")
  21.  
  22. println(customerDF.toString())
  23.  
  24.  
  25. println("*** show() gives you neatly formatted data")
  26. customerDF.show()
  27.  
  28.  
  29. println("*** use select() to choose one column")
  30. customerDF.select("id").show()
  31.  
  32. println("*** use select() for multiple columns")
  33.  
  34. customerDF.select("sales", "state").show()
  35.  
  36.  
  37. customerDF.select("id").show()
  38.  
  39.  
  40. //For example, filter API is very similar to WHERE clause in SQL
  41. println("*** use filter() to choose rows")
  42.  
  43. customerDF.filter($"state".equalTo("CA")).show()
  44.  
  45. //Scala/FirstDataset.scala
  46. //This tutorial create a Spark dataset from a scala Seq object (Shift-Enter to run)
  47.  
  48. //Convert a Seq scala object to Spark Dataset
  49.  
  50. val s = Seq(10, 11, 12, 13, 14, 15)
  51. val ds = s.toDS()
  52.  
  53. // only one column, and it always has the same name
  54. println("*** only one column, and it always has the same name")
  55. ds.columns.foreach(println(_))
  56.  
  57. //println("*** only one column, and it always has the same name")
  58. ds.columns.foreach(println(_))
  59.  
  60. //println("*** column types")
  61. ds.dtypes.foreach(println(_))
  62.  
  63. //println("*** schema as if it was a DataFrame")
  64. ds.printSchema()
  65.  
  66. println("*** values > 12")
  67. ds.where($"value" > 12).show()
  68.  
  69. // This seems to be the best way to get a range that's actually a Seq and
  70. // thus easy to convert to a Dataset, rather than a Range, which isn't.
  71. val s2 = Seq.range(1, 100)
  72.  
  73. println("*** size of the range")
  74. println(s2.size)
  75.  
  76. //create Dataset from RDD
  77. val tuples = Seq((1, "one", "un"), (2, "two", "deux"), (3, "three", "trois"))
  78. val tupleDS = tuples.toDS()
  79.  
  80. println("*** Tuple Dataset types")
  81. tupleDS.dtypes.foreach(println(_))
  82.  
  83. // the tuple columns have unfriendly names, but you can use them to query
  84. println("*** filter by one column and fetch another")
  85. tupleDS.where($"_1" > 2).select($"_2", $"_3").show()
  86.  
  87. //Create Dataset from DataFrame
  88. //case class Company(name: String, foundingYear: Int, numEmployees: Int)
  89. //val inputSeq = Seq(Company("ABC", 1998, 310), Company("XYZ", 1983, 904), Company("NOP", 2005, 83))
  90. //val df = sc.parallelize(inputSeq).toDF()
  91. //val companyDS = df.as[Company]
  92. //companyDS.show()
  93.  
  94. //FirstRDD.scala
  95. //This tutorial create a Spark RDD though you should just use DF
  96. // put some data in an RDD in 4 partitions
  97. val numbers = 1 to 10
  98. val numbersRDD = sc.parallelize(numbers, 4)
  99. println("Print each element of the original RDD")
  100. numbersRDD.foreach(println)
  101.  
  102. // trivially operate on the numbers
  103. val stillAnRDD = numbersRDD.map(n => n.toDouble / 10)
  104.  
  105. // get the data back out
  106. val nowAnArray = stillAnRDD.collect()
  107.  
  108. // interesting how the array comes out sorted but the RDD didn't
  109. println("Now print each element of the transformed array")
  110. nowAnArray.foreach(println)
  111.  
  112. // explore RDD properties
  113. //glom return an RDD created by coalescing all elements within each partition into an array.
  114. val partitions = stillAnRDD.glom()
  115. println("We _should_ have 4 partitions")
  116. println(partitions.count())
  117. partitions.foreach(a => {
  118. println("Partition contents:" +
  119. a.foldLeft("")((s, e) => s + " " + e))
  120. })
  121.  
  122. //##### This tutorial explores the interoperability between DataFrame and Dataset. Note that Dataset is covered in much greater detail in the DataSet tutoria
  123. //DataSetConversion.scala
  124.  
  125. case class Cust(id: Integer, name: String, sales: Double, discount: Double, state: String)
  126.  
  127. case class StateSales(state: String, sales: Double)
  128.  
  129. // create a sequence of case class objects
  130. // (we defined the case class above)
  131. val custs = Seq(
  132. Cust(1, "Widget Co", 120000.00, 0.00, "AZ"),
  133. Cust(2, "Acme Widgets", 410500.00, 500.00, "CA"),
  134. Cust(3, "Widgetry", 410500.00, 200.00, "CA"),
  135. Cust(4, "Widgets R Us", 410500.00, 0.0, "CA"),
  136. Cust(5, "Ye Olde Widgete", 500.00, 0.0, "MA")
  137. )
  138.  
  139.  
  140. // Create the DataFrame without passing through an RDD
  141. val customerDF = spark.createDataFrame(custs)
  142.  
  143. println("*** DataFrame schema")
  144.  
  145. customerDF.printSchema()
  146.  
  147. println("*** DataFrame contents")
  148.  
  149. customerDF.show()
  150.  
  151. println("*** Select and filter the DataFrame")
  152.  
  153. val smallerDF =
  154. customerDF.select("sales", "state").filter($"state".equalTo("CA"))
  155.  
  156. smallerDF.show()
  157.  
  158. // Convert it to a Dataset by specifying the type of the rows -- use a case
  159. // class because we have one and it's most convenient to work with. Notice
  160. // you have to choose a case class that matches the remaining columns.
  161. // BUT also notice that the columns keep their order from the DataFrame --
  162. // later you'll see a Dataset[StateSales] of the same type where the
  163. // columns have the opposite order, because of the way it was created.
  164. val customerDS = smallerDF.as[StateSales]
  165.  
  166.  
  167. println("*** Dataset schema")
  168.  
  169. customerDS.printSchema()
  170.  
  171. // Select and other operations can be performed directly on a Dataset too,
  172. // but be careful to read the documentation for Dataset -- there are
  173. // "typed transformations", which produce a Dataset, and
  174. // "untyped transformations", which produce a DataFrame. In particular,
  175. // you need to project using a TypedColumn to gate a Dataset.
  176.  
  177. val verySmallDS = customerDS.select($"sales".as[Double])
  178.  
  179. println("*** Dataset after projecting one column")
  180.  
  181. verySmallDS.show()
  182.  
  183. // If you select multiple columns on a Dataset you end up with a Dataset
  184. // of tuple type, but the columns keep their names.
  185. val tupleDS = customerDS.select($"state".as[String], $"sales".as[Double])
  186.  
  187. // You can also cast back to a Dataset of a case class. Notice this time
  188. // the columns have the opposite order than the last Dataset[StateSales]
  189. val betterDS = tupleDS.as[StateSales]
  190.  
  191. println("*** Dataset after projecting two columns -- case class version")
  192.  
  193. betterDS.show()
  194.  
  195. // Converting back to a DataFrame without making other changes is really easy
  196. val backToDataFrame = tupleDS.toDF()
  197.  
  198. println("*** This time as a DataFrame")
  199.  
  200. backToDataFrame.show()
  201.  
  202. // While converting back to a DataFrame you can rename the columns
  203. val renamedDataFrame = tupleDS.toDF("MyState", "MySales")
  204.  
  205. println("*** Again as a DataFrame but with renamed columns")
  206.  
  207. renamedDataFrame.show()
  208.  
  209. //TypeSafetyDatasets.scala
  210. case class Person(name : String , age : Int)
  211.  
  212. val fpath = s"gs://bigdatax-spark-102/people.json"
  213.  
  214. val dataframe = spark.read.json(fpath)
  215.  
  216. //Column doesn't exists
  217. //Will throw error
  218. //Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`salary`' given input columns: [age, name]; line 1 pos 0;
  219. //'Filter ('salary > 10000)
  220. //+- Relation[age#8L,name#9] json
  221.  
  222. dataframe.filter("salary > 10000").show
  223.  
  224. //Datasets
  225. // case class Person(name : String , age : Int)
  226. val personRDD = spark.sparkContext.makeRDD(Seq(Person("A",10),Person("B",20)))
  227. val personDF = spark.createDataFrame(personRDD)
  228. val ds = personDF.as[Person]
  229. ds.filter(p => p.age > 25).show()
  230. ds.filter(p => p.name == "Andy").show()
  231.  
  232. //Showing error at compile time itself as salary column is not in dataset
  233. ds.filter(p => p.salary > 25).show()
  234. // error : value salary is not a member of person// returns RDD[Person]
  235.  
  236. //Read JSON Datasets
  237. val gfilepath = s"gs://bigdatax-spark-102/miserables.json"
  238.  
  239. //SparkUDFs.scala
  240.  
  241. val dataset = Seq((2, "hello"), (4, "world")).toDF("id", "text")
  242. val dataframe = dataset.toDF()
  243. dataframe.show()
  244.  
  245. // Define a regular Scala function
  246. val upper: String => String = _.toUpperCase
  247.  
  248. // Define a UDF that wraps the upper Scala function defined above
  249. // You could also define the function in place, i.e. inside udf
  250. // but separating Scala functions from Spark SQL's UDFs allows for easier testing
  251. import org.apache.spark.sql.functions.udf
  252. val upperUDF = udf(upper)
  253.  
  254. val squared = (s: Int) => {
  255. s * s
  256. }
  257.  
  258. val squareUDF = udf(squared)
  259.  
  260. // Apply the UDF to change the source dataset
  261. dataset.withColumn("upper", upperUDF('text)).show
  262. dataframe.withColumn("upper", upperUDF('text)).show
  263.  
  264.  
  265. //// UDF for datasets
  266. dataset.withColumn("squaredvalue", squareUDF('id)).show
  267.  
  268. // UDF for DataFrames
  269. dataframe.withColumn("squaredvalue", squareUDF('id)).show
  270.  
  271. //QueryAnalyzer.scala
  272.  
  273. val inventory = spark
  274. .range(5)
  275. .withColumn("new_column", 'id + 5 as "plus5")
  276.  
  277. inventory.show()
  278.  
  279. //Show physical and optimized plan
  280. inventory.explain(extended = true)
  281.  
  282. //Alternate way
  283. val analyzedPlan = inventory.queryExecution.analyzed
  284.  
  285. //ReadWriteFiles.scala
  286.  
  287. val sourcePath=s"gs://meetup-bigdatax/sparkworkshop/resources/flightdata-json"
  288. val destCSVPath = s"gs://meetup-bigdatax/resources/sparkworkshop/resources/flighdata.csv"
  289. val destPartPath=s"gs://meetup-bigdatax/resources/sparkworkshop/resources/partitionedata"
  290.  
  291. ////schema of the json file
  292. import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
  293. val myManualSchema = new StructType(Array(
  294. new StructField("DEST_COUNTRY_NAME", StringType, true),
  295. new StructField("ORIGIN_COUNTRY_NAME", StringType, true),
  296. new StructField("count", LongType, false) ))
  297.  
  298. //lood the json file
  299. val jsonFile = spark.read.format("json").option("mode", "FAILFAST").schema(myManualSchema).load(sourcePath)
  300. //show the file data
  301. jsonFile.show(5)
  302.  
  303. // //write the file data in csv format
  304. jsonFile.write.format("csv").mode("overwrite").option("sep", "\t")
  305. .save(destCSVPath)
  306.  
  307. //write the data using partition
  308. jsonFile.limit(10).write.mode("overwrite").partitionBy("DEST_COUNTRY_NAME")
  309. .save(destPartPath)
  310.  
  311. //Read data from sql
  312. //Load the data from SQL database
  313. val sqlDF = spark.read
  314. .format("jdbc")
  315. .option("driver", "org.postgresql.Driver")
  316. .option("url", "jdbc:postgresql://database_server")
  317. .option("dbtable", "schema.tablename")
  318. .option("user", "username").option("password","my-secret-password").load()
  319.  
  320. sqlDF.select("DEST_COUNTRY_NAME").distinct().show(5)
  321.  
  322. //ReadXMLFile
  323. val xmlFile = s"gs://meetup-bigdatax/sparkworkshop/resources/Item.xml"
  324. val innerSchema = StructType(
  325. StructField("ItemData",
  326. ArrayType(
  327. StructType(
  328. StructField("IdKey",LongType,true)::
  329. StructField("Value",LongType,true)::Nil
  330. )
  331. ),true)::Nil
  332. )
  333.  
  334. val schema = StructType(
  335. StructField("CDate",StringType,true)::
  336. StructField("ListItemData", innerSchema, true):: Nil
  337. )
  338. import spark.sqlContext.implicits._
  339. val df = spark.sqlContext.read.format("com.databricks.spark.xml")
  340. .option("rowTag", "Item")
  341. .schema(schema)
  342. .load(xmlFile)
  343. //Selecy nested field and explode to get the flattern result
  344. .withColumn("ItemData", explode($"ListItemData.ItemData"))
  345. .select("CDate", "ItemData.*").show() // select required column
  346.  
  347.  
  348.  
  349.  
  350. //Spark Streaming
  351. import org.apache.spark.streaming.{ Seconds, StreamingContext }
  352. import org.apache.spark.SparkContext._
  353. import org.apache.spark.streaming.twitter._
  354. import org.apache.spark.SparkConf
  355. import org.apache.spark.streaming._
  356. import org.apache.spark.{ SparkContext, SparkConf }
  357. import org.apache.spark.storage.StorageLevel
  358. //import org.apache.spark.streaming.flume._
  359.  
  360. /**
  361. * A Spark Streaming application that receives tweets on certain
  362. * keywords from twitter datasource and find the popular hashtags
  363. *
  364. * Arguments: <comsumerKey> <consumerSecret> <accessToken> <accessTokenSecret> <keyword_1> ... <keyword_n>
  365. * <comsumerKey> - Twitter consumer key
  366. * <consumerSecret> - Twitter consumer secret
  367. * <accessToken> - Twitter access token
  368. * <accessTokenSecret> - Twitter access token secret
  369. * <keyword_1> - The keyword to filter tweets
  370. * <keyword_n> - Any number of keywords to filter tweets
  371. *
  372. *
  373. *
  374. *
  375. */
  376.  
  377. object SparkPopularHashTags {
  378. val conf = new SparkConf().setMaster("local[4]").setAppName("Spark Streaming - PopularHashTags")
  379. val sc = new SparkContext(conf)
  380.  
  381. def main(args: Array[String]) {
  382.  
  383. sc.setLogLevel("WARN")
  384. //either pass the arguments through console
  385. //val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
  386. //val filters = args.takeRight(args.length - 4)
  387.  
  388. val filters = Array("#trending","#popular","#tech","#smartcities","#MachineLearning");
  389. // Set the system properties so that Twitter4j library used by twitter stream
  390. // can use them to generat OAuth credentials
  391. System.setProperty("twitter4j.oauth.consumerKey", <comsumerKey>)
  392. System.setProperty("twitter4j.oauth.consumerSecret", <consumerSecret> )
  393. System.setProperty("twitter4j.oauth.accessToken", <accessToken>)
  394. System.setProperty("twitter4j.oauth.accessTokenSecret", <accessTokenSecret>)
  395.  
  396. // Set the Spark StreamingContext to create a DStream for every 5 seconds
  397. val ssc = new StreamingContext(sc, Seconds(5))
  398. // Pass the filter keywords as arguements
  399.  
  400. // val stream = FlumeUtils.createStream(ssc, args(0), args(1).toInt)
  401. val stream = TwitterUtils.createStream(ssc, None, filters)
  402.  
  403. // Split the stream on space and extract hashtags
  404. val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
  405.  
  406. // Get the top hashtags over the previous 60 sec window
  407. val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
  408. .map { case (topic, count) => (count, topic) }
  409. .transform(_.sortByKey(false))
  410.  
  411. // Get the top hashtags over the previous 10 sec window
  412. val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
  413. .map { case (topic, count) => (count, topic) }
  414. .transform(_.sortByKey(false))
  415.  
  416. // print tweets in the currect DStream
  417. stream.print()
  418.  
  419. // Print popular hashtags
  420. topCounts60.foreachRDD(rdd => {
  421. val topList = rdd.take(10)
  422. println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
  423. topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
  424. })
  425. topCounts10.foreachRDD(rdd => {
  426. val topList = rdd.take(10)
  427. println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
  428. topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
  429. })
  430.  
  431. ssc.start()
  432. ssc.awaitTermination()
  433. }
  434. }
Add Comment
Please, Sign In to add comment