Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package spark;
- import java.text.DateFormat;
- import java.text.ParseException;
- import java.text.SimpleDateFormat;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.Date;
- import java.util.HashSet;
- import java.util.List;
- import java.util.Set;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import com.google.common.collect.Iterables;
- import scala.Tuple2;
- public class PageRank {
- public static void main(String[] args) throws ParseException {
- DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
- Date requestedTime = (Date) df.parse(args[3]).clone();
- int numIterations = Integer.parseInt(args[2]);
- //initialize configurations
- SparkConf conf = new SparkConf();
- Configuration jobConf = new Configuration();
- conf.setAppName("PageRank-v1");
- JavaSparkContext sc = new JavaSparkContext(conf);
- jobConf.set("textinputformat.record.delimiter", "\n\n");
- JavaPairRDD<LongWritable, Text> lines = sc.newAPIHadoopFile(args[0], TextInputFormat.class, LongWritable.class,
- Text.class, jobConf);
- //transform the line in the form of <artcile,record> which are both of type String
- JavaPairRDD<String, Iterable<String>> links = lines.mapToPair(s -> {
- String[] parts = s._2.toString().split("\\s+");
- String article = parts[3];
- return new Tuple2<String, String>(article, s._2.toString());
- }).groupByKey();
- //transform from <article,record> to <article,outlinks> where outlinks is a set of all outlinks from the main line in each record
- //also take only the latest edition which is before the user specified date
- //and filter null values
- links = links.mapValues(y -> {
- Iterable<String> newestRevision = null;
- Date newestRevisionTime = null;
- for (String revision : y) {
- String[] revLines = revision.split("\n");
- // for date
- String[] firstLine = revLines[0].split("\\s+");
- String article = firstLine[3];
- String dateStr = firstLine[4];
- // for outlinks
- String[] mainLine = revLines[3].split("\\s+");
- // no 2 routes
- Set<String> outlinks = new HashSet<String>(Arrays.asList(mainLine));
- outlinks.remove("MAIN");
- outlinks.remove(article);
- Date currentRevisionTime = df.parse(dateStr);
- if (currentRevisionTime.before(requestedTime)
- && (newestRevisionTime == null || currentRevisionTime.after(newestRevisionTime))) {
- newestRevision = outlinks;
- newestRevisionTime = (Date) currentRevisionTime.clone();
- }
- }
- return newestRevision;
- }).filter(x -> x._2 != null);
- //perform pagerank algorithm (from file)
- JavaPairRDD<Double,String> output = null;
- JavaPairRDD<String, Double> ranks = links.mapValues(s -> 1.0);
- for (int current = 0; current < numIterations; current++) {
- JavaPairRDD<String, Double> contribs = links.join(ranks).values().flatMapToPair(v -> {
- List<Tuple2<String, Double>> res = new ArrayList<Tuple2<String, Double>>();
- int urlCount = Iterables.size(v._1);
- for (String s : v._1)
- res.add(new Tuple2<String, Double>(s, v._2() / urlCount));
- return res;
- });
- ranks = contribs.reduceByKey((a, b) -> a + b).mapValues(v -> 0.15 + v * 0.85);
- output = ranks.mapToPair(x -> x.swap()).sortByKey(false);
- ranks = output.mapToPair(x -> x.swap());
- }
- ranks.saveAsTextFile(args[1]);
- sc.close();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement