Advertisement
Guest User

Untitled

a guest
Dec 7th, 2016
70
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 5.01 KB | None | 0 0
  1. import java.io.BufferedReader;
  2. import java.io.FileReader;
  3. import java.io.IOException;
  4. import java.net.URI;
  5. import java.util.ArrayList;
  6. import java.util.HashSet;
  7. import java.util.List;
  8. import java.util.Set;
  9. import java.util.StringTokenizer;
  10.  
  11. import org.apache.hadoop.conf.Configuration;
  12. import org.apache.hadoop.fs.Path;
  13. import org.apache.hadoop.io.IntWritable;
  14. import org.apache.hadoop.io.DoubleWritable;
  15. import org.apache.hadoop.io.Text;
  16. import org.apache.hadoop.mapreduce.Job;
  17. import org.apache.hadoop.mapreduce.Mapper;
  18. import org.apache.hadoop.mapreduce.Reducer;
  19. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  20. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  21. import org.apache.hadoop.mapreduce.Counter;
  22. import org.apache.hadoop.util.GenericOptionsParser;
  23. import org.apache.hadoop.util.StringUtils;
  24.  
  25. public class FriendshipCount {
  26.  
  27.   public static class TokenizerMapper
  28.        extends Mapper<Object, Text, Text, DoubleWritable>{
  29.  
  30.     static enum CountersEnum { INPUT_WORDS }
  31.  
  32.     private final static DoubleWritable one = new DoubleWritable(0.5);
  33.     private Text word = new Text();
  34.  
  35.     private boolean caseSensitive;
  36.     private Set<String> patternsToSkip = new HashSet<String>();
  37.  
  38.     private Configuration conf;
  39.     private BufferedReader fis;
  40.  
  41.     @Override
  42.     public void setup(Context context) throws IOException,
  43.         InterruptedException {
  44.       conf = context.getConfiguration();
  45.       caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
  46.       if (conf.getBoolean("wordcount.skip.patterns", true)) {
  47.         URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
  48.         for (URI patternsURI : patternsURIs) {
  49.           Path patternsPath = new Path(patternsURI.getPath());
  50.           String patternsFileName = patternsPath.getName().toString();
  51.           parseSkipFile(patternsFileName);
  52.         }
  53.       }
  54.     }
  55.  
  56.     private void parseSkipFile(String fileName) {
  57.       try {
  58.         fis = new BufferedReader(new FileReader(fileName));
  59.         String pattern = null;
  60.         while ((pattern = fis.readLine()) != null) {
  61.           patternsToSkip.add(pattern);
  62.         }
  63.       } catch (IOException ioe) {
  64.         System.err.println("Caught exception while parsing the cached file '"
  65.             + StringUtils.stringifyException(ioe));
  66.       }
  67.     }
  68.  
  69.     @Override
  70.     public void map(Object key, Text value, Context context
  71.                     ) throws IOException, InterruptedException {
  72.       String line = (caseSensitive) ?
  73.           value.toString() : value.toString().toLowerCase();
  74.       for (String pattern : patternsToSkip) {
  75.         line = line.replaceAll(pattern, "");
  76.       }
  77.       StringTokenizer itr = new StringTokenizer(line);
  78.       while (itr.hasMoreTokens()) {
  79.         word.set(itr.nextToken());
  80.     System.out.println(word);
  81.         context.write(word, one);
  82.         Counter counter = context.getCounter(CountersEnum.class.getName(),
  83.             CountersEnum.INPUT_WORDS.toString());
  84.         counter.increment(1);
  85.       }
  86.     }
  87.   }
  88.  
  89.   public static class IntSumReducer
  90.        extends Reducer<Text,DoubleWritable,Text,DoubleWritable> {
  91.     private DoubleWritable result = new DoubleWritable();
  92.  
  93.     public void reduce(Text key, Iterable<DoubleWritable> values,
  94.                        Context context
  95.                        ) throws IOException, InterruptedException {
  96.     double sum = 0.0;
  97.     System.out.println("==================== Reduce ===================");
  98.       for (DoubleWritable val : values) {
  99.     System.out.println(val.get());
  100.         sum += val.get();
  101.       }
  102.       result.set(sum);
  103.       context.write(key, result);
  104.     System.out.println("==================== Reduce ===================");
  105.     }
  106.   }
  107.  
  108.   public static void main(String[] args) throws Exception {
  109.     Configuration conf = new Configuration();
  110.     GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
  111.     String[] remainingArgs = optionParser.getRemainingArgs();
  112.     if (!(remainingArgs.length != 2 || remainingArgs.length != 4)) {
  113.       System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
  114.       System.exit(2);
  115.     }
  116.     Job job = Job.getInstance(conf, "word count");
  117.     job.setJarByClass(FriendshipCount.class);
  118.     job.setMapperClass(TokenizerMapper.class);
  119.     job.setCombinerClass(IntSumReducer.class);
  120.     job.setReducerClass(IntSumReducer.class);
  121.     job.setOutputKeyClass(Text.class);
  122.     job.setOutputValueClass(DoubleWritable.class);
  123.  
  124.     List<String> otherArgs = new ArrayList<String>();
  125.     for (int i=0; i < remainingArgs.length; ++i) {
  126.       if ("-skip".equals(remainingArgs[i])) {
  127.         job.addCacheFile(new Path(remainingArgs[++i]).toUri());
  128.         job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
  129.       } else {
  130.         otherArgs.add(remainingArgs[i]);
  131.       }
  132.     }
  133.     FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
  134.     FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));
  135.  
  136.     System.exit(job.waitForCompletion(true) ? 0 : 1);
  137.   }
  138. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement