Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package wikipedia
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkContext._
- import org.apache.spark.rdd.RDD
- case class WikipediaArticle(title: String, text: String)
- object WikipediaRanking {
- val langs = List(
- "JavaScript", "Java", "PHP", "Python", "C#", "C++", "Ruby", "CSS",
- "Objective-C", "Perl", "Scala", "Haskell", "MATLAB", "Clojure", "Groovy")
- val conf: SparkConf = new SparkConf().setMaster("local").setAppName("Wikipedia App")
- val sc: SparkContext = new SparkContext(conf)
- val wikiRdd: RDD[WikipediaArticle] = sc.textFile(WikipediaData.filePath).map(WikipediaData.parse);
- /** Returns the number of articles on which the language `lang` occurs.
- */
- def occurrencesOfLang(lang: String, rdd: RDD[WikipediaArticle]): Int =
- rdd.aggregate(0)(
- (acc, article) => {
- if (article.text.contains(lang + " ")) acc + 1 else acc
- },
- (acc1, acc2) => acc1 + acc2
- )
- def rankLangs(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] =
- langs.map((lang) => (lang, occurrencesOfLang(lang, rdd))).sortWith(_._2 >_._2)
- /* Compute an inverted index of the set of articles, mapping each language
- * to the Wikipedia pages in which it occurs.
- */
- def makeIndex(langs: List[String], rdd: RDD[WikipediaArticle]): RDD[(String, Iterable[WikipediaArticle])] =
- rdd.flatMap(article => langs.collect{
- case lang if article.text.contains(lang + " ") => (lang, article)
- }).groupByKey()
- def rankLangsUsingIndex(index: RDD[(String, Iterable[WikipediaArticle])]): List[(String, Int)] =
- index
- .map(t => (t._1, t._2.size))
- .collect()
- .toList
- .sortWith(_._2 >_._2)
- def rankLangsReduceByKey(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] =
- rdd
- .flatMap(article => langs.collect{
- case lang if article.text.contains(lang + " ") => (lang, 1)
- })
- .reduceByKey(_ + _)
- .collect()
- .toList
- .sortWith(_._2 >_._2)
- def main(args: Array[String]) {
- val langsRanked: List[(String, Int)] = timed("Part 1: naive ranking", rankLangs(langs, wikiRdd))
- def index: RDD[(String, Iterable[WikipediaArticle])] = makeIndex(langs, wikiRdd)
- val langsRanked2: List[(String, Int)] = timed("Part 2: ranking using inverted index", rankLangsUsingIndex(index))
- val langsRanked3: List[(String, Int)] = timed("Part 3: ranking using reduceByKey", rankLangsReduceByKey(langs, wikiRdd))
- println(timing)
- sc.stop()
- }
- val timing = new StringBuffer
- def timed[T](label: String, code: => T): T = {
- val start = System.currentTimeMillis()
- val result = code
- val stop = System.currentTimeMillis()
- timing.append(s"Processing $label took ${stop - start} ms.\n")
- result
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement