Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public class driver{
- public static void main(String args[]) throws Exception {
- //Chaining two map reduce jobs
- Configuration conf= new Configuration();
- String parameter[]= new GenericOptionsParser(conf,args).getRemainingArgs();
- if(parameter.length!=3) {
- System.err.println("Three arguments needed <File1> <File2> <Out>");
- System.exit(2);
- }
- //Configuration for first MR Job
- Job job1 = new Job(conf, "Left Outer Join");
- job1.setJarByClass(driver.class);
- //Multiple inputs to run multiple mappers in a single job
- MultipleInputs.addInputPath(job1, new Path(parameter[0]), TextInputFormat.class,movieMapper.class);
- MultipleInputs.addInputPath(job1, new Path(parameter[1]), TextInputFormat.class,ratingMapper.class);
- // job1.setCombinerClass(reducer1.class);
- job1.setReducerClass(reducer1.class);
- job1.setMapOutputKeyClass(IntWritable.class);
- job1.setMapOutputValueClass(Text.class);
- job1.setOutputKeyClass(NullWritable.class);
- job1.setOutputValueClass(Text.class);
- //Output format for chaining
- job1.setOutputFormatClass(SequenceFileOutputFormat.class);
- FileOutputFormat.setOutputPath(job1, new Path(parameter[2] + "/temp"));
- int code=job1.waitForCompletion(true) ? 0:1;
- if(code==0) {
- //Configuration for second MR job
- Job job2 = new Job(conf, "Top 10 Views");
- job2.setJarByClass(driver.class);
- job2.setMapperClass(top10Mapper.class);
- //Reducer needs to be one to generate top 10 values
- job2.setNumReduceTasks(1);
- // job2.setCombinerClass(reducer2.class);
- job2.setReducerClass(reducer2.class);
- //Give (K,V) pairs in decreasing order to the reducer
- job2.setSortComparatorClass(LongWritable.DecreasingComparator.class);
- job2.setMapOutputKeyClass(IntWritable.class);
- job2.setMapOutputValueClass(Text.class);
- job2.setOutputKeyClass(Text.class);
- job2.setOutputValueClass(IntWritable.class);
- //Input format for chaining
- job2.setInputFormatClass(SequenceFileInputFormat.class);
- FileInputFormat.addInputPath(job2, new Path(parameter[2] + "/temp"));
- FileOutputFormat.setOutputPath(job2, new Path(parameter[2] + "/final"));
- System.exit(job2.waitForCompletion(true) ? 0:1);
- }
- }
- //#####Job 1 (For Left Join)
- public class movieMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
- public void map(LongWritable key, Text value,Context context)throws IOException,InterruptedException {
- String tokens[]=value.toString().trim().split("::");
- context.write(new IntWritable(Integer.parseInt(tokens[0])), new Text("2#"+tokens[1]));
- }
- }
- public class ratingMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
- public void map(LongWritable key, Text value,Context context)throws IOException,InterruptedException {
- String tokens[]=value.toString().trim().split("::");
- //Value is concatenated with a digit to sort at a later stage
- context.write(new IntWritable(Integer.parseInt(tokens[1])), new Text("1#"+tokens[0]));
- }
- }
- public class reducer1 extends Reducer<IntWritable,Text,NullWritable,Text> {
- public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
- //value : < UserId1, UserId2,...,UserIdn,Movie Name >
- ArrayList<String> token=new ArrayList<String>();
- for(Text val: values) {
- token.add(val.toString());
- }
- Collections.sort(token);
- //Condition to check is the Movie ID exists in both files
- if(token.size()>1) {
- for(int i=0; i<token.size();i++) {
- token.set(i,processValue(token.get(i).toString()));
- }
- String reducerOutput = String.join(":", token);
- context.write(NullWritable.get(), new Text(reducerOutput.toString()));
- }
- }
- //Function to remove the concatenated number from the value token
- String processValue(String rawToken) {
- String newToken[]=rawToken.split("#");
- return newToken[1];
- }
- //####Job 2 ( For finding top 10)
- public class top10Mapper extends Mapper<NullWritable,Text,IntWritable, Text> {
- public void map(NullWritable key, Text value, Context context) throws IOException, InterruptedException{
- String token[]=value.toString().split(":");
- int totalViews= token.length-1;
- context.write(new IntWritable(totalViews),new Text(token[totalViews]));
- // Views Movie Name
- }
- }
- public class reducer2 extends Reducer<IntWritable,Text,Text,IntWritable> {
- //Counter to terminate at Top 10 movies
- int topCount=0;
- protected void setup(Context context) throws IOException,InterruptedException{
- topCount=0;
- }
- public void reducer(IntWritable key, Iterable<Text> values, Context context) throws IOException,InterruptedException {
- //The output won't be restricted to only 10 values but to all those values which have a key in the range of top 10, which can be more than ten
- if(topCount<10) {
- for(Text val:values) {
- context.write(val, key);
- topCount++;
- }
- }
- }
- }
Add Comment
Please, Sign In to add comment