yadavshashank

Movie Lens Analysis Code

Jun 10th, 2018
45
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 5.12 KB | None | 0 0
  1. public class driver{
  2.  
  3.     public static void main(String args[]) throws Exception {
  4.    
  5.     //Chaining two map reduce jobs 
  6.        
  7.            
  8.         Configuration conf= new Configuration();
  9.         String parameter[]= new GenericOptionsParser(conf,args).getRemainingArgs();
  10.        
  11.         if(parameter.length!=3) {
  12.            
  13.             System.err.println("Three arguments needed  <File1> <File2> <Out>");
  14.             System.exit(2);
  15.         }
  16.        
  17.        
  18.        
  19.         //Configuration for first MR Job
  20.         Job job1 = new Job(conf, "Left Outer Join");
  21.         job1.setJarByClass(driver.class);
  22.        
  23.         //Multiple inputs to run multiple mappers in a single job
  24.         MultipleInputs.addInputPath(job1, new Path(parameter[0]), TextInputFormat.class,movieMapper.class);
  25.         MultipleInputs.addInputPath(job1, new Path(parameter[1]), TextInputFormat.class,ratingMapper.class);
  26.        
  27. //      job1.setCombinerClass(reducer1.class);
  28.         job1.setReducerClass(reducer1.class);
  29.        
  30.        
  31.         job1.setMapOutputKeyClass(IntWritable.class);
  32.         job1.setMapOutputValueClass(Text.class);
  33.         job1.setOutputKeyClass(NullWritable.class);
  34.         job1.setOutputValueClass(Text.class);
  35.        
  36.         //Output format for chaining
  37.         job1.setOutputFormatClass(SequenceFileOutputFormat.class);
  38.                
  39.         FileOutputFormat.setOutputPath(job1, new Path(parameter[2] + "/temp"));
  40.        
  41.         int code=job1.waitForCompletion(true) ? 0:1;
  42.        
  43.    
  44.         if(code==0) {
  45.             //Configuration for second MR job
  46.            
  47.             Job job2 = new Job(conf, "Top 10 Views");
  48.             job2.setJarByClass(driver.class);
  49.             job2.setMapperClass(top10Mapper.class);
  50.            
  51.             //Reducer needs to be one to generate top 10 values
  52.             job2.setNumReduceTasks(1);
  53. //          job2.setCombinerClass(reducer2.class);
  54.             job2.setReducerClass(reducer2.class);
  55.            
  56.             //Give (K,V) pairs in decreasing order to the reducer
  57.             job2.setSortComparatorClass(LongWritable.DecreasingComparator.class);
  58.            
  59.             job2.setMapOutputKeyClass(IntWritable.class);
  60.             job2.setMapOutputValueClass(Text.class);
  61.             job2.setOutputKeyClass(Text.class);
  62.             job2.setOutputValueClass(IntWritable.class);
  63.            
  64.             //Input format for chaining
  65.             job2.setInputFormatClass(SequenceFileInputFormat.class);
  66.             FileInputFormat.addInputPath(job2, new Path(parameter[2] + "/temp"));
  67.             FileOutputFormat.setOutputPath(job2, new Path(parameter[2] + "/final"));
  68.            
  69.            
  70.  
  71.             System.exit(job2.waitForCompletion(true) ? 0:1);
  72.         }
  73.            
  74.        
  75.     }
  76.  
  77.  
  78. //#####Job 1 (For Left Join)
  79.  
  80. public class movieMapper extends Mapper<LongWritable, Text, IntWritable, Text>  {
  81.    
  82.     public void map(LongWritable key, Text value,Context context)throws IOException,InterruptedException {
  83.        
  84.         String tokens[]=value.toString().trim().split("::");
  85.        
  86.         context.write(new IntWritable(Integer.parseInt(tokens[0])), new Text("2#"+tokens[1]));
  87.     }
  88.  
  89. }
  90.  
  91. public class ratingMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
  92.    
  93.     public void map(LongWritable key, Text value,Context context)throws IOException,InterruptedException {
  94.        
  95.         String tokens[]=value.toString().trim().split("::");
  96.        
  97.         //Value is concatenated with a digit to sort at a later stage
  98.         context.write(new IntWritable(Integer.parseInt(tokens[1])), new Text("1#"+tokens[0]));
  99.     }
  100. }
  101.  
  102.  
  103. public class reducer1 extends Reducer<IntWritable,Text,NullWritable,Text> {
  104.    
  105.     public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
  106.        
  107.         //value : < UserId1, UserId2,...,UserIdn,Movie Name >
  108.        
  109.         ArrayList<String> token=new ArrayList<String>();
  110.        
  111.         for(Text val: values) {
  112.            
  113.             token.add(val.toString());
  114.         }
  115.        
  116.         Collections.sort(token);
  117.        
  118.         //Condition to check is the Movie ID exists in both files
  119.         if(token.size()>1) {
  120.            
  121.             for(int i=0; i<token.size();i++) {
  122.                 token.set(i,processValue(token.get(i).toString()));
  123.             }
  124.            
  125.              String reducerOutput = String.join(":", token);
  126.            
  127.            
  128.             context.write(NullWritable.get(), new Text(reducerOutput.toString()));
  129.         }
  130.        
  131.     }
  132.    
  133. //Function to remove the concatenated number from the value token
  134.    
  135.     String processValue(String rawToken) {
  136.        
  137.         String newToken[]=rawToken.split("#");
  138.        
  139.         return newToken[1];
  140.     }
  141.  
  142.  
  143.  
  144. //####Job 2 ( For finding top 10)
  145.  
  146. public class top10Mapper extends Mapper<NullWritable,Text,IntWritable, Text> {
  147.    
  148.     public void map(NullWritable key, Text value, Context context) throws IOException, InterruptedException{
  149.        
  150.         String token[]=value.toString().split(":");
  151.        
  152.         int totalViews= token.length-1;
  153.        
  154.         context.write(new IntWritable(totalViews),new Text(token[totalViews]));
  155.         //                      Views                        Movie Name
  156.     }
  157.    
  158.  
  159. }
  160.  
  161. public class reducer2 extends Reducer<IntWritable,Text,Text,IntWritable> {
  162.    
  163.     //Counter to terminate at Top 10 movies
  164.     int topCount=0;
  165.    
  166.     protected void setup(Context context) throws IOException,InterruptedException{
  167.         topCount=0;
  168.        
  169.     }
  170.    
  171.     public void reducer(IntWritable key, Iterable<Text> values, Context context) throws IOException,InterruptedException {
  172.        
  173.        
  174.         //The output won't be restricted to only 10 values but to all those values which have a key in the range of top 10, which can be more than ten
  175.         if(topCount<10) {
  176.            
  177.             for(Text val:values) {
  178.                
  179.                 context.write(val, key);
  180.                 topCount++;
  181.             }
  182.         }
  183.     }
  184. }
Add Comment
Please, Sign In to add comment