Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package org.myorg;
- import java.io.IOException;
- import java.util.*;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.conf.*;
- import org.apache.hadoop.io.*;
- import org.apache.hadoop.mapred.*;
- import org.apache.hadoop.util.*;
- public class FindMax {
- public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
- private IntWritable val = new IntWritable();
- private Text department = new Text();
- public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
- String line = value.toString();
- StringTokenizer tokenizer = new StringTokenizer(line);
- while (tokenizer.hasMoreTokens()){
- department.set(tokenizer.nextToken());
- val.set(Integer.parseInt(tokenizer.nextToken()));
- }
- output.collect(department, val);
- }
- }
- public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
- public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
- int maxx = 0;
- while (values.hasNext()) {
- int temp = values.next().get();
- if (maxx < temp){
- maxx = temp;
- }
- }
- output.collect(key, new IntWritable(maxx));
- }
- }
- public static void main(String[] args) throws Exception {
- JobConf conf = new JobConf(FindMax.class);
- conf.setJobName("findmax");
- conf.setOutputKeyClass(Text.class);
- conf.setOutputValueClass(IntWritable.class);
- conf.setMapperClass(Map.class);
- conf.setCombinerClass(Reduce.class);
- conf.setReducerClass(Reduce.class);
- conf.setInputFormat(TextInputFormat.class);
- conf.setOutputFormat(TextOutputFormat.class);
- FileInputFormat.setInputPaths(conf, new Path(args[0]));
- FileOutputFormat.setOutputPath(conf, new Path(args[1]));
- JobClient.runJob(conf);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement