Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package cop.test;
- import java.io.IOException;
- import java.util.Iterator;
- import java.util.StringTokenizer;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- //Bejoy : included extends Configured implements Tool for run method
- public class WordCountNewAPI extends Configured implements Tool {
- public static class WordCountMapper extends Mapper<LongWritable,Text, Text, IntWritable> {
- private final static IntWritable ONE = new IntWritable(1);
- private Text word = new Text();
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- String line = value.toString();
- StringTokenizer tokenizer = new StringTokenizer(line);
- while (tokenizer.hasMoreTokens()) {
- word.set(tokenizer.nextToken());
- context.write(word, ONE);
- }
- //Bejoy : it would actually throw a runtime exception due to mismatch in the map output key
- // super.map(key, value, context);
- }
- }
- public static class WordCountReducer extends Reducer<Text,IntWritable, Text, IntWritable> {
- @Override
- protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
- int sum = 0;
- Iterator<IntWritable> iter = values.iterator();
- while (iter.hasNext()) {
- sum += iter.next().get();
- }
- context.write(key, new IntWritable(sum));
- //Bejoy: Why you need to call the super map and reduce?
- // super.reduce(key, values, context);
- }
- }
- /**
- * @param args
- * @throws Exception
- */
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(),new WordCountNewAPI(), args);
- System.exit(res);
- }
- // Bejoy: Changed return type to make it compatible with Tool.run()
- public int run(String[] args) throws Exception {
- Configuration conf = new Configuration();
- Job job = new Job(conf, "wordcount");
- job.setJarByClass(WordCountNewAPI.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- job.setMapperClass(WordCountMapper.class);
- job.setCombinerClass(WordCountReducer.class);
- job.setReducerClass(WordCountReducer.class);
- job.setInputFormatClass(TextInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
- FileInputFormat.setInputPaths(job, new Path("/samples/wc/input"));
- FileOutputFormat.setOutputPath(job, new Path("/samples/wc/output"));
- boolean b = job.waitForCompletion(true);
- if (!b) {
- throw new IOException("error with job!");
- }
- return job.waitForCompletion(true) ? 0 : 1;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement