Advertisement
Guest User

Assignment 1

a guest
Mar 20th, 2017
191
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 2.82 KB | None | 0 0
  1. package wikipedia
  2.  
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.SparkContext
  5. import org.apache.spark.SparkContext._
  6.  
  7. import org.apache.spark.rdd.RDD
  8.  
  9. case class WikipediaArticle(title: String, text: String)
  10.  
  11. object WikipediaRanking {
  12.  
  13.   val langs = List(
  14.     "JavaScript", "Java", "PHP", "Python", "C#", "C++", "Ruby", "CSS",
  15.     "Objective-C", "Perl", "Scala", "Haskell", "MATLAB", "Clojure", "Groovy")
  16.  
  17.   val conf: SparkConf = new SparkConf().setMaster("local").setAppName("Wikipedia App")
  18.   val sc: SparkContext = new SparkContext(conf)
  19.   val wikiRdd: RDD[WikipediaArticle] = sc.textFile(WikipediaData.filePath).map(WikipediaData.parse);
  20.  
  21.   /** Returns the number of articles on which the language `lang` occurs.
  22.    */
  23.   def occurrencesOfLang(lang: String, rdd: RDD[WikipediaArticle]): Int =
  24.     rdd.aggregate(0)(
  25.       (acc, article) => {
  26.         if (article.text.contains(lang + " ")) acc + 1 else acc
  27.       },
  28.       (acc1, acc2) => acc1 + acc2
  29.     )
  30.  
  31.   def rankLangs(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] =
  32.     langs.map((lang) => (lang, occurrencesOfLang(lang, rdd))).sortWith(_._2 >_._2)
  33.  
  34.   /* Compute an inverted index of the set of articles, mapping each language
  35.    * to the Wikipedia pages in which it occurs.
  36.    */
  37.   def makeIndex(langs: List[String], rdd: RDD[WikipediaArticle]): RDD[(String, Iterable[WikipediaArticle])] =
  38.     rdd.flatMap(article => langs.collect{
  39.       case lang if article.text.contains(lang + " ") => (lang, article)
  40.     }).groupByKey()
  41.  
  42.   def rankLangsUsingIndex(index: RDD[(String, Iterable[WikipediaArticle])]): List[(String, Int)] =
  43.     index
  44.       .map(t => (t._1, t._2.size))
  45.       .collect()
  46.       .toList
  47.       .sortWith(_._2 >_._2)
  48.  
  49.   def rankLangsReduceByKey(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] =
  50.     rdd
  51.       .flatMap(article => langs.collect{
  52.         case lang if article.text.contains(lang + " ") => (lang, 1)
  53.       })
  54.       .reduceByKey(_ + _)
  55.       .collect()
  56.       .toList
  57.       .sortWith(_._2 >_._2)
  58.  
  59.   def main(args: Array[String]) {
  60.  
  61.     val langsRanked: List[(String, Int)] = timed("Part 1: naive ranking", rankLangs(langs, wikiRdd))
  62.     def index: RDD[(String, Iterable[WikipediaArticle])] = makeIndex(langs, wikiRdd)
  63.     val langsRanked2: List[(String, Int)] = timed("Part 2: ranking using inverted index", rankLangsUsingIndex(index))
  64.     val langsRanked3: List[(String, Int)] = timed("Part 3: ranking using reduceByKey", rankLangsReduceByKey(langs, wikiRdd))
  65.  
  66.     println(timing)
  67.     sc.stop()
  68.   }
  69.  
  70.   val timing = new StringBuffer
  71.   def timed[T](label: String, code: => T): T = {
  72.     val start = System.currentTimeMillis()
  73.     val result = code
  74.     val stop = System.currentTimeMillis()
  75.     timing.append(s"Processing $label took ${stop - start} ms.\n")
  76.     result
  77.   }
  78. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement