Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /* SimpleApp.scala */
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkContext._
- import org.apache.spark.SparkConf
- import org.apache.spark.sql.DataFrame
- case class Rank (pageURL:String, pageRank:Int, avgDuration:Int)
- object SimpleApp {
- def loadRankings(sc: SparkContext, sqlContext: org.apache.spark.sql.SQLContext, parts: Int, cached: Boolean=true): DataFrame = {
- import sqlContext.implicits._
- val textFile = sc.textFile("/var/scratch/vdbogert/var/lib/bigDataBenchmark/5nodes.txt")
- val textFileCoalesced = textFile.coalesce(parts, true)
- val schemaRanks = (if(parts > 0) textFileCoalesced else textFile).map(_.split(",")).map(r=>Rank(r(0), r(1).toInt, r(2).toInt)).toDF()
- schemaRanks.registerTempTable("rankings")
- if (cached) {sqlContext.cacheTable("rankings"); println("Dataset being cached")} else println("Data not being cached")
- /* First one for caching */
- sqlContext.sql("SELECT COUNT(1) FROM rankings").collect()
- return schemaRanks
- }
- def main(args: Array[String]) {
- /* args */
- val parts = args(0).toInt
- val reps = args(1).toInt
- val query = args(2)
- val cached = args(3).toBoolean
- val conf = new SparkConf().setAppName("Simple Application")
- val sc = new SparkContext(conf)
- val sqlContext = new org.apache.spark.sql.SQLContext(sc)
- loadRankings(sc, sqlContext, parts, cached)
- System.out.println("Starting loop with reps")
- for(i <- 1 to reps) {
- sqlContext.sql(query).collect()
- }
- sc.stop
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement