Advertisement
JunkPile77

Junk

Mar 21st, 2019
124
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 5.33 KB | None | 0 0
  1. package org.apache.hadoop.ramapo;
  2. import java.io.IOException;
  3. import java.util.StringTokenizer;
  4. import java.util.Map;
  5. import java.util.TreeMap;
  6. import java.util.Arrays;
  7. import java.util.ArrayList;
  8. import org.apache.hadoop.conf.Configuration;
  9. import org.apache.hadoop.fs.Path;
  10. import org.apache.hadoop.io.Writable;
  11. import org.apache.hadoop.io.IntWritable;
  12. import org.apache.hadoop.io.LongWritable;
  13. import org.apache.hadoop.io.NullWritable;
  14. import org.apache.hadoop.io.Text;
  15. import org.apache.hadoop.mapreduce.Job;
  16. import org.apache.hadoop.mapreduce.Mapper;
  17. import org.apache.hadoop.mapreduce.Reducer;
  18. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  19. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  20.  
  21. public class Project2{
  22.  
  23.   public static class MyMapper
  24.         extends Mapper<LongWritable, Text, IntWritable, IntWritable>{
  25.             public void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException{
  26.                 String[] line = value.toString().split("\t");
  27.                 //String[] line = value.toString().split(",");
  28.                 Integer user = Integer.parseInt(line[0]);
  29.                 Integer movie = Integer.parseInt(line[1]);
  30.                 context.write(new IntWritable(user), new IntWritable(movie));
  31.  
  32.             }
  33.     }
  34.  
  35.  
  36.   public static class MyReducer
  37.         extends Reducer<IntWritable, IntWritable, IntWritable, Text> {
  38.  
  39.             // first TM is for UserID : movies; the second is UserID : friendList
  40.             private TreeMap<Integer, ArrayList<Integer>> TM1 = new TreeMap<Integer, ArrayList<Integer>>();
  41.             public TreeMap<Integer, ArrayList<Integer>> TM2 = new TreeMap<Integer, ArrayList<Integer>>();
  42.             // will be used for the context (output will be the list of a user's liked movies)
  43.             String outputString = "";
  44.             public void reduce (IntWritable key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException{
  45.  
  46.                 // create a treemap where key is UserID and value is a
  47.                 // list of UserID's watched movies
  48.                 for(IntWritable x: value){
  49.                       Integer movieVal = x.get();
  50.                       ArrayList<Integer> valList = TM1.get(key.get());
  51.                       // if not in list, add new list to TM, else
  52.                       // append to TM's value list
  53.                       if (valList == null) {
  54.                       valList = new ArrayList<Integer>();
  55.                       valList.add(movieVal);
  56.                       TM1.put(key.get(), valList);
  57.                       } else {
  58.                         valList.add(movieVal);
  59.                       }
  60.  
  61.                       // populate the second TM; this one will contain UserID: Friend List
  62.                   TM1.forEach((k,v) -> {
  63.                             TM1.forEach((k2,v2) -> {
  64.                               // determine if k and k2 are friends
  65.                               // if value contains a single instance found in value2, they are friends
  66.                                 boolean isFriend = (v2.stream().filter(item -> v.contains(item.intValue())).findAny().orElse(null)) == null ? false : true;
  67.                                 if(isFriend) {
  68.                                   // if in map2, append value to list,
  69.                                   // else create entry with k as key, and k2 as first value
  70.                                   // in list
  71.                                     if(TM2.containsKey(k)) {
  72.                                         TM2.get(k).add(k2);
  73.                                       } else {
  74.                                         ArrayList<Integer> friendList = new ArrayList<Integer>();
  75.                                         friendList.add(k2);
  76.                                         TM2.put(k, friendList);
  77.                                       }
  78.                                     }
  79.                                   });
  80.                                 });
  81.  
  82.                   TM2.forEach((k, v) -> System.out.println("UserID: " + k + "\nFriend List: " + v + "\n\n"));
  83.                 }
  84.  
  85.                 for(ArrayList<Integer>  x: TM2.values()){
  86.                   outputString = Integer.toString(x.get(0));
  87.                   for (int i = 1; i < x.size(); i++)
  88.                   {
  89.                       outputString = outputString + "," + Integer.toString(x.get(i)) ;
  90.                   }
  91.                   outputString = outputString + "\n";
  92.                 }
  93.  
  94.                 context.write(new IntWritable(key.get()), new Text(outputString));
  95.  
  96.  
  97.             }
  98.     }
  99.  
  100.   public static void main(String[] args) throws Exception {
  101.         Configuration conf = new Configuration();
  102.         Job job = Job.getInstance(conf, "Project2");
  103.         job.setJarByClass(Project2.class);
  104.         job.setMapperClass(MyMapper.class);
  105.         //job.setCombinerClass(MyReducer.class);
  106.         job.setReducerClass(MyReducer.class);
  107.         job.setMapOutputKeyClass(IntWritable.class);
  108.         job.setMapOutputValueClass(IntWritable.class);
  109.         job.setOutputKeyClass(IntWritable.class);
  110.         job.setOutputValueClass(Text.class);
  111.         FileInputFormat.addInputPath(job, new Path(args[0]));
  112.         FileOutputFormat.setOutputPath(job, new Path(args[1]));
  113.         System.exit(job.waitForCompletion(true) ? 0 : 1);
  114.   }
  115. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement