Advertisement
Guest User

Untitled

a guest
Jan 23rd, 2017
115
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 5.90 KB | None | 0 0
  1. package ru.mipt;
  2.  
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.conf.Configured;
  5. import org.apache.hadoop.fs.FileSystem;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.Reducer;
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  15. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  16. import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
  17. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  18. import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
  19. import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
  20. import org.apache.hadoop.util.Tool;
  21. import org.apache.hadoop.util.ToolRunner;
  22.  
  23. import java.io.IOException;
  24. import java.util.StringTokenizer;
  25.  
  26. public class GlobalSorter extends Configured implements Tool {
  27.  
  28.     public static class DeletePunctAndCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
  29.         private final static LongWritable one = new LongWritable(1);
  30.         private Text word = new Text();
  31.         private Text prev_word = new Text();
  32.  
  33.         @Override
  34.         public void map(LongWritable offset, Text line, Context context) throws IOException, InterruptedException {
  35.             String str = line.toString().replaceAll("\\p{Punct}|\\d", " ");
  36.             StringTokenizer tokenizer = new StringTokenizer(str);
  37.             if (tokenizer.hasMoreTokens()) {
  38.                 prev_word.set(tokenizer.nextToken());
  39.             }
  40.             while (tokenizer.hasMoreTokens()) {
  41.                 word.set(tokenizer.nextToken());
  42.                 String pair = prev_word.toString() + " " + word.toString();
  43.                 context.write(new Text(pair), one);
  44.                 prev_word.set(word);
  45.             }
  46.         }
  47.     }
  48.  
  49.     public static class SumAndReverseReducer extends Reducer<Text, LongWritable, LongWritable, Text> {
  50.         private LongWritable count = new LongWritable();
  51.  
  52.         @Override
  53.         public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
  54.             int sum = 0;
  55.             for (LongWritable num : values) {
  56.                 sum += num.get();
  57.             }
  58.             count.set(sum);
  59.             context.write(count, key);
  60.         }
  61.     }
  62.  
  63.     public static class StubMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
  64.         @Override
  65.         public void map(LongWritable num, Text word, Context context) throws IOException, InterruptedException {
  66.             context.write(num, word);
  67.         }
  68.     }
  69.  
  70.     public static class InverseReducer extends Reducer<LongWritable, Text, Text, LongWritable> {
  71.         @Override
  72.         public void reduce(LongWritable num, Iterable<Text> words, Context context) throws IOException, InterruptedException {
  73.             for (Text word : words) {
  74.                 context.write(word, num);
  75.             }
  76.         }
  77.     }
  78.  
  79.     private static void deleteFolder(FileSystem fs, Path... paths) throws IOException {
  80.         for (Path path: paths) {
  81.             if (fs.exists(path)) {
  82.                 fs.deleteOnExit(path);
  83.             }
  84.         }
  85.     }
  86.  
  87.     @Override
  88.     public int run(String[] strings) throws Exception {
  89.         Path inputPath = new Path(strings[0]);
  90.         Path outputPath = new Path(strings[1]);
  91.         Path midPath = new Path(strings[1] + "_tmp");
  92.         Integer reducersNum = Integer.parseInt(strings[3]);
  93.         Configuration conf = this.getConf();
  94.         FileSystem fs = FileSystem.get(conf);
  95.  
  96.         Job counter = Job.getInstance(conf);
  97.         counter.setJobName("wordcounter");
  98.         counter.setJarByClass(GlobalSorter.class);
  99.  
  100.         counter.setInputFormatClass(TextInputFormat.class);
  101.         counter.setOutputFormatClass(SequenceFileOutputFormat.class);
  102.  
  103.         counter.setMapperClass(DeletePunctAndCountMapper.class);
  104.         counter.setReducerClass(SumAndReverseReducer.class);
  105.         counter.setNumReduceTasks(reducersNum);
  106.  
  107.         counter.setOutputKeyClass(LongWritable.class);
  108.         counter.setOutputValueClass(Text.class);
  109.         counter.setMapOutputKeyClass(Text.class);
  110.         counter.setMapOutputValueClass(LongWritable.class);
  111.  
  112.         FileInputFormat.addInputPath(counter, inputPath);
  113.         SequenceFileOutputFormat.setOutputPath(counter, midPath);
  114.  
  115.         if (!counter.waitForCompletion(true)) {
  116.             deleteFolder(fs, midPath);
  117.             return -1;
  118.         }
  119.  
  120.         Path partPath = new Path(strings[1] + "_part");
  121.  
  122.         Job sorter = Job.getInstance(conf);
  123.         sorter.setJobName("sorter");
  124.         sorter.setJarByClass(GlobalSorter.class);
  125.  
  126.         sorter.setMapperClass(StubMapper.class);
  127.         sorter.setReducerClass(InverseReducer.class);
  128.  
  129.         sorter.setInputFormatClass(SequenceFileInputFormat.class);
  130.         sorter.setOutputFormatClass(TextOutputFormat.class);
  131.  
  132.         sorter.setOutputKeyClass(Text.class);
  133.         sorter.setOutputValueClass(Text.class);
  134.         sorter.setMapOutputKeyClass(LongWritable.class);
  135.         sorter.setMapOutputValueClass(Text.class);
  136.         sorter.setSortComparatorClass(LongWritable.DecreasingComparator.class);
  137.  
  138.         SequenceFileInputFormat.setInputPaths(sorter, midPath);
  139.         FileOutputFormat.setOutputPath(sorter, outputPath);
  140.         sorter.setNumReduceTasks(1);
  141.  
  142.         int resultCode = 0;
  143.         if (!sorter.waitForCompletion(true)) {
  144.             resultCode = -2;
  145.         }
  146.         deleteFolder(fs, midPath, partPath);
  147.         return resultCode;
  148.     }
  149.  
  150.     public static void main(String[] args) throws Exception {
  151.         ToolRunner.run(new GlobalSorter(), args);
  152.     }
  153. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement