Advertisement
Guest User

Untitled

a guest
Nov 24th, 2011
80
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. package cop.test;
  2.  
  3. import java.io.IOException;
  4. import java.util.Iterator;
  5. import java.util.StringTokenizer;
  6.  
  7. import org.apache.hadoop.conf.Configuration;
  8. import org.apache.hadoop.conf.Configured;
  9. import org.apache.hadoop.fs.Path;
  10. import org.apache.hadoop.io.IntWritable;
  11. import org.apache.hadoop.io.LongWritable;
  12. import org.apache.hadoop.io.Text;
  13. import org.apache.hadoop.mapreduce.Job;
  14. import org.apache.hadoop.mapreduce.Mapper;
  15. import org.apache.hadoop.mapreduce.Reducer;
  16. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  17. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  18. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  19. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  20. import org.apache.hadoop.util.Tool;
  21. import org.apache.hadoop.util.ToolRunner;
  22.  
  23. //Bejoy : included extends Configured implements Tool for run method
  24. public class WordCountNewAPI extends Configured implements Tool {
  25.  
  26. public static class WordCountMapper extends Mapper<LongWritable,Text, Text, IntWritable> {
  27.  
  28. private final static IntWritable ONE = new IntWritable(1);
  29. private Text word = new Text();
  30.  
  31. @Override
  32. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  33.  
  34. String line = value.toString();
  35. StringTokenizer tokenizer = new StringTokenizer(line);
  36. while (tokenizer.hasMoreTokens()) {
  37. word.set(tokenizer.nextToken());
  38. context.write(word, ONE);
  39. }
  40. //Bejoy : it would actually throw a runtime exception due to mismatch in the map output key
  41. // super.map(key, value, context);
  42. }
  43.  
  44. }
  45.  
  46. public static class WordCountReducer extends Reducer<Text,IntWritable, Text, IntWritable> {
  47.  
  48. @Override
  49. protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  50.  
  51. int sum = 0;
  52. Iterator<IntWritable> iter = values.iterator();
  53. while (iter.hasNext()) {
  54. sum += iter.next().get();
  55. }
  56.  
  57. context.write(key, new IntWritable(sum));
  58.  
  59. //Bejoy: Why you need to call the super map and reduce?
  60. // super.reduce(key, values, context);
  61. }
  62.  
  63. }
  64.  
  65. /**
  66. * @param args
  67. * @throws Exception
  68. */
  69. public static void main(String[] args) throws Exception {
  70.  
  71.  
  72. int res = ToolRunner.run(new Configuration(),new WordCountNewAPI(), args);
  73. System.exit(res);
  74. }
  75.  
  76. // Bejoy: Changed return type to make it compatible with Tool.run()
  77. public int run(String[] args) throws Exception {
  78.  
  79. Configuration conf = new Configuration();
  80.  
  81.  
  82. Job job = new Job(conf, "wordcount");
  83.  
  84. job.setJarByClass(WordCountNewAPI.class);
  85.  
  86. job.setOutputKeyClass(Text.class);
  87. job.setOutputValueClass(IntWritable.class);
  88.  
  89. job.setMapperClass(WordCountMapper.class);
  90. job.setCombinerClass(WordCountReducer.class);
  91. job.setReducerClass(WordCountReducer.class);
  92.  
  93. job.setInputFormatClass(TextInputFormat.class);
  94. job.setOutputFormatClass(TextOutputFormat.class);
  95.  
  96. FileInputFormat.setInputPaths(job, new Path("/samples/wc/input"));
  97. FileOutputFormat.setOutputPath(job, new Path("/samples/wc/output"));
  98.  
  99. boolean b = job.waitForCompletion(true);
  100. if (!b) {
  101. throw new IOException("error with job!");
  102. }
  103.  
  104. return job.waitForCompletion(true) ? 0 : 1;
  105. }
  106.  
  107. }
  108.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement