Advertisement
Guest User

Untitled

a guest
Mar 20th, 2018
80
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 3.57 KB | None | 0 0
  1. package spark;
  2.  
  3. import java.text.DateFormat;
  4. import java.text.ParseException;
  5. import java.text.SimpleDateFormat;
  6. import java.util.ArrayList;
  7. import java.util.Arrays;
  8. import java.util.Date;
  9. import java.util.HashSet;
  10. import java.util.List;
  11. import java.util.Set;
  12. import org.apache.hadoop.conf.Configuration;
  13. import org.apache.hadoop.io.LongWritable;
  14. import org.apache.hadoop.io.Text;
  15. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  16. import org.apache.spark.SparkConf;
  17. import org.apache.spark.api.java.JavaPairRDD;
  18. import org.apache.spark.api.java.JavaSparkContext;
  19.  
  20. import com.google.common.collect.Iterables;
  21.  
  22. import scala.Tuple2;
  23.  
  24. public class PageRank {
  25.  
  26.     public static void main(String[] args) throws ParseException {
  27.         DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
  28.         Date requestedTime = (Date) df.parse(args[3]).clone();
  29.         int numIterations = Integer.parseInt(args[2]);
  30.  
  31.         //initialize configurations
  32.         SparkConf conf = new SparkConf();
  33.         Configuration jobConf = new Configuration();
  34.         conf.setAppName("PageRank-v1");
  35.         JavaSparkContext sc = new JavaSparkContext(conf);
  36.         jobConf.set("textinputformat.record.delimiter", "\n\n");
  37.         JavaPairRDD<LongWritable, Text> lines = sc.newAPIHadoopFile(args[0], TextInputFormat.class, LongWritable.class,
  38.                 Text.class, jobConf);
  39.         //transform the line in the form of  <artcile,record> which are both of type String
  40.         JavaPairRDD<String, Iterable<String>> links = lines.mapToPair(s -> {
  41.             String[] parts = s._2.toString().split("\\s+");
  42.             String article = parts[3];
  43.             return new Tuple2<String, String>(article, s._2.toString());
  44.         }).groupByKey();
  45.        
  46.         //transform from <article,record> to <article,outlinks> where outlinks is a set of all outlinks from the main line in each record
  47.         //also take only the latest edition which is before the user specified date
  48.         //and filter null values
  49.         links = links.mapValues(y -> {
  50.             Iterable<String> newestRevision = null;
  51.             Date newestRevisionTime = null;
  52.             for (String revision : y) {
  53.                 String[] revLines = revision.split("\n");
  54.                 // for date
  55.                 String[] firstLine = revLines[0].split("\\s+");
  56.                 String article = firstLine[3];
  57.                 String dateStr = firstLine[4];
  58.                 // for outlinks
  59.                 String[] mainLine = revLines[3].split("\\s+");
  60.                 // no 2 routes
  61.  
  62.                 Set<String> outlinks = new HashSet<String>(Arrays.asList(mainLine));
  63.                 outlinks.remove("MAIN");
  64.                 outlinks.remove(article);
  65.                 Date currentRevisionTime = df.parse(dateStr);
  66.                 if (currentRevisionTime.before(requestedTime)
  67.                         && (newestRevisionTime == null || currentRevisionTime.after(newestRevisionTime))) {
  68.  
  69.                     newestRevision = outlinks;
  70.                     newestRevisionTime = (Date) currentRevisionTime.clone();
  71.                 }
  72.             }
  73.             return newestRevision;
  74.  
  75.         }).filter(x -> x._2 != null);
  76.  
  77.         //perform pagerank algorithm (from file)
  78.         JavaPairRDD<Double,String> output = null;
  79.        
  80.         JavaPairRDD<String, Double> ranks = links.mapValues(s -> 1.0);
  81.         for (int current = 0; current < numIterations; current++) {
  82.             JavaPairRDD<String, Double> contribs = links.join(ranks).values().flatMapToPair(v -> {
  83.                 List<Tuple2<String, Double>> res = new ArrayList<Tuple2<String, Double>>();
  84.                 int urlCount = Iterables.size(v._1);
  85.                 for (String s : v._1)
  86.                     res.add(new Tuple2<String, Double>(s, v._2() / urlCount));
  87.                 return res;
  88.             });
  89.             ranks = contribs.reduceByKey((a, b) -> a + b).mapValues(v -> 0.15 + v * 0.85);
  90.  
  91.             output = ranks.mapToPair(x -> x.swap()).sortByKey(false);
  92.             ranks = output.mapToPair(x -> x.swap());
  93.         }
  94.        
  95.         ranks.saveAsTextFile(args[1]);
  96.         sc.close();
  97.  
  98.     }
  99. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement