Guest User

Stack Overflow http://stackoverflow.com/questions/14126964

a guest
Jan 2nd, 2013
388
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 5.00 KB | None | 0 0
  1. package csw.hadoop.sandbox;
  2.  
  3. import java.io.File;
  4. import java.io.IOException;
  5. import java.io.PrintStream;
  6.  
  7. import org.apache.hadoop.conf.Configured;
  8. import org.apache.hadoop.fs.FileUtil;
  9. import org.apache.hadoop.fs.Path;
  10. import org.apache.hadoop.io.LongWritable;
  11. import org.apache.hadoop.io.Text;
  12. import org.apache.hadoop.mapreduce.Job;
  13. import org.apache.hadoop.mapreduce.Mapper;
  14. import org.apache.hadoop.mapreduce.Reducer;
  15. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  16. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  17. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  18. import org.apache.hadoop.util.Tool;
  19. import org.apache.hadoop.util.ToolRunner;
  20.  
  21. public class SOCombinerDriver extends Configured implements Tool {
  22.     public static class Step4Mapper extends
  23.             Mapper<LongWritable, Text, Text, Text> {
  24.  
  25.         @Override
  26.         protected void cleanup(Context context) throws IOException,
  27.                 InterruptedException {
  28.             System.err.println("Mapper cleanup start");
  29.  
  30.             context.write(new Text("k1"), new Text("v1"));
  31.             context.write(new Text("k2"), new Text("v2"));
  32.             context.write(new Text("k3"), new Text("v3"));
  33.            
  34.             System.err.println("Mapper cleanup done");
  35.         }
  36.  
  37.         @Override
  38.         protected void map(LongWritable key, Text value, Context context)
  39.                 throws IOException, InterruptedException {
  40.         }
  41.     }
  42.  
  43.     public static class Step4Combiner extends Reducer<Text, Text, Text, Text> {
  44.         private static Text key0 = new Text();
  45.         private static Text key1 = new Text();
  46.  
  47.         @Override
  48.         protected void setup(Context context) throws IOException,
  49.                 InterruptedException {
  50.             System.err.println("Combiner setup");
  51.         }
  52.  
  53.         public void reduce(Text key, Iterable<Text> values, Context context)
  54.                 throws IOException, InterruptedException {
  55.             System.err.println("combine: input: key:" + key);
  56.             key0.set("KeyOne");
  57.             key1.set("KeyTwo");
  58.             context.write(key0, new Text("some value"));
  59.             context.write(key1, new Text("some other value"));
  60.         }
  61.  
  62.     }
  63.  
  64.     public static class Step4Reducer extends Reducer<Text, Text, Text, Text> {
  65.  
  66.         @Override
  67.         protected void setup(Context context) throws IOException,
  68.                 InterruptedException {
  69.             System.err.println("Reducer setup");
  70.         }
  71.  
  72.         public void reduce(Text key, Iterable<Text> values, Context context)
  73.                 throws IOException, InterruptedException {
  74.             System.err.print("reduce: Key:" + key.toString() + " Value: ");
  75.             String theOutput = "";
  76.             for (Text val : values) {
  77.                 System.err.print("," + val);
  78.             }
  79.             System.err.print("\n");
  80.  
  81.             context.write(key, new Text(theOutput));
  82.         }
  83.  
  84.     }
  85.  
  86.     public static void main(String args[]) throws Exception {
  87.         PrintStream ps = new PrintStream(new File("test.csv"));
  88.         ps.println();
  89.         ps.close();
  90.  
  91.         FileUtil.fullyDelete(new File("output"));
  92.  
  93.         String newArgs[] = new String[] { "-Dfs.default.name=file:///",
  94.                 "-Dmapred.job.tracker=local", "test.csv", "output" };
  95.         ToolRunner.run(new SOCombinerDriver(), newArgs);
  96.     }
  97.  
  98.     public static class GroupComparator extends Text.Comparator {
  99.  
  100.         @Override
  101.         public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
  102.             int cmp = super.compare(b1, s1, l1, b2, s2, l2);
  103.             System.err.println("Group cmp: " + new String(b1, s1, l2) + " ?= "
  104.                     + new String(b2, s2, l2) + " = " + cmp);
  105.             return cmp;
  106.         }
  107.  
  108.     }
  109.  
  110.     public static class SortComparator extends Text.Comparator {
  111.  
  112.         @Override
  113.         public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
  114.             int cmp = super.compare(b1, s1, l1, b2, s2, l2);
  115.             System.err.println("Sort cmp: " + new String(b1, s1, l2) + " ?= "
  116.                     + new String(b2, s2, l2) + " = " + cmp);
  117.             return cmp;
  118.         }
  119.  
  120.     }
  121.  
  122.     public int run(String[] args) throws Exception {
  123.         Job job4 = new Job(getConf(), "Step 4");
  124.         job4.setJarByClass(SOCombinerDriver.class);
  125.  
  126.         job4.setMapperClass(Step4Mapper.class);
  127.         job4.setCombinerClass(Step4Combiner.class);
  128.         job4.setReducerClass(Step4Reducer.class);
  129.  
  130.         job4.setGroupingComparatorClass(GroupComparator.class);
  131.         job4.setSortComparatorClass(SortComparator.class);
  132.  
  133.         job4.setInputFormatClass(TextInputFormat.class);
  134.         job4.setOutputKeyClass(Text.class);
  135.         job4.setOutputValueClass(Text.class);
  136.  
  137.         FileInputFormat.addInputPaths(job4, args[0] + "," + args[0]);
  138.         FileOutputFormat.setOutputPath(job4, new Path(args[1]));
  139.  
  140.         System.exit(job4.waitForCompletion(true) ? 0 : 1);
  141.         return 0;
  142.     }
  143. }
Add Comment
Please, Sign In to add comment