Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package csw.hadoop.sandbox;
- import java.io.File;
- import java.io.IOException;
- import java.io.PrintStream;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.FileUtil;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- public class SOCombinerDriver extends Configured implements Tool {
- public static class Step4Mapper extends
- Mapper<LongWritable, Text, Text, Text> {
- @Override
- protected void cleanup(Context context) throws IOException,
- InterruptedException {
- System.err.println("Mapper cleanup start");
- context.write(new Text("k1"), new Text("v1"));
- context.write(new Text("k2"), new Text("v2"));
- context.write(new Text("k3"), new Text("v3"));
- System.err.println("Mapper cleanup done");
- }
- @Override
- protected void map(LongWritable key, Text value, Context context)
- throws IOException, InterruptedException {
- }
- }
- public static class Step4Combiner extends Reducer<Text, Text, Text, Text> {
- private static Text key0 = new Text();
- private static Text key1 = new Text();
- @Override
- protected void setup(Context context) throws IOException,
- InterruptedException {
- System.err.println("Combiner setup");
- }
- public void reduce(Text key, Iterable<Text> values, Context context)
- throws IOException, InterruptedException {
- System.err.println("combine: input: key:" + key);
- key0.set("KeyOne");
- key1.set("KeyTwo");
- context.write(key0, new Text("some value"));
- context.write(key1, new Text("some other value"));
- }
- }
- public static class Step4Reducer extends Reducer<Text, Text, Text, Text> {
- @Override
- protected void setup(Context context) throws IOException,
- InterruptedException {
- System.err.println("Reducer setup");
- }
- public void reduce(Text key, Iterable<Text> values, Context context)
- throws IOException, InterruptedException {
- System.err.print("reduce: Key:" + key.toString() + " Value: ");
- String theOutput = "";
- for (Text val : values) {
- System.err.print("," + val);
- }
- System.err.print("\n");
- context.write(key, new Text(theOutput));
- }
- }
- public static void main(String args[]) throws Exception {
- PrintStream ps = new PrintStream(new File("test.csv"));
- ps.println();
- ps.close();
- FileUtil.fullyDelete(new File("output"));
- String newArgs[] = new String[] { "-Dfs.default.name=file:///",
- "-Dmapred.job.tracker=local", "test.csv", "output" };
- ToolRunner.run(new SOCombinerDriver(), newArgs);
- }
- public static class GroupComparator extends Text.Comparator {
- @Override
- public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- int cmp = super.compare(b1, s1, l1, b2, s2, l2);
- System.err.println("Group cmp: " + new String(b1, s1, l2) + " ?= "
- + new String(b2, s2, l2) + " = " + cmp);
- return cmp;
- }
- }
- public static class SortComparator extends Text.Comparator {
- @Override
- public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- int cmp = super.compare(b1, s1, l1, b2, s2, l2);
- System.err.println("Sort cmp: " + new String(b1, s1, l2) + " ?= "
- + new String(b2, s2, l2) + " = " + cmp);
- return cmp;
- }
- }
- public int run(String[] args) throws Exception {
- Job job4 = new Job(getConf(), "Step 4");
- job4.setJarByClass(SOCombinerDriver.class);
- job4.setMapperClass(Step4Mapper.class);
- job4.setCombinerClass(Step4Combiner.class);
- job4.setReducerClass(Step4Reducer.class);
- job4.setGroupingComparatorClass(GroupComparator.class);
- job4.setSortComparatorClass(SortComparator.class);
- job4.setInputFormatClass(TextInputFormat.class);
- job4.setOutputKeyClass(Text.class);
- job4.setOutputValueClass(Text.class);
- FileInputFormat.addInputPaths(job4, args[0] + "," + args[0]);
- FileOutputFormat.setOutputPath(job4, new Path(args[1]));
- System.exit(job4.waitForCompletion(true) ? 0 : 1);
- return 0;
- }
- }
Add Comment
Please, Sign In to add comment