Advertisement
Guest User

Composite Key / Grouping MapReduce Example

a guest
Dec 19th, 2012
434
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 7.33 KB | None | 0 0
  1. package csw.hadoop.reuse;
  2.  
  3. import java.io.DataInput;
  4. import java.io.DataOutput;
  5. import java.io.IOException;
  6. import java.io.PrintStream;
  7.  
  8. import org.apache.hadoop.conf.Configuration;
  9. import org.apache.hadoop.conf.Configured;
  10. import org.apache.hadoop.fs.FileSystem;
  11. import org.apache.hadoop.fs.Path;
  12. import org.apache.hadoop.io.IntWritable;
  13. import org.apache.hadoop.io.LongWritable;
  14. import org.apache.hadoop.io.SequenceFile;
  15. import org.apache.hadoop.io.Text;
  16. import org.apache.hadoop.io.WritableComparable;
  17. import org.apache.hadoop.io.WritableComparator;
  18. import org.apache.hadoop.mapreduce.Job;
  19. import org.apache.hadoop.mapreduce.Mapper;
  20. import org.apache.hadoop.mapreduce.Reducer;
  21. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  22. import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
  23. import org.apache.hadoop.util.Tool;
  24. import org.apache.hadoop.util.ToolRunner;
  25.  
  26. /**
  27.  * Example job which takes a dataset of [identifier,timestamp,speed] and
  28.  * calculates (naively) the total distance travelled by each identifier
  29.  * <p>
  30.  * Shows how a Key Group Comparator can be used, along with sublte key reuse in
  31.  * the reducer to achieve the output
  32.  */
  33. public class KeyReducerReuseExample extends Configured implements Tool {
  34.  
  35.     @Override
  36.     public int run(String[] arg0) throws Exception {
  37.         Job job = new Job(getConf());
  38.         job.setJarByClass(KeyReducerReuseExample.class);
  39.         Configuration conf = job.getConfiguration();
  40.  
  41.         job.setInputFormatClass(TextInputFormat.class);
  42.         TextInputFormat.setInputPaths(job, createTestFile(conf));
  43.  
  44.         job.setMapperClass(KrreMapper.class);
  45.         job.setReducerClass(KrreReducer.class);
  46.  
  47.         job.setOutputFormatClass(SequenceFileOutputFormat.class);
  48.         Path outputPath = new Path(conf.get("outputPath"));
  49.         FileSystem.get(conf).delete(outputPath, true);
  50.         SequenceFileOutputFormat.setOutputPath(job, outputPath);
  51.  
  52.         job.setMapOutputKeyClass(CompositeKey.class);
  53.         job.setMapOutputValueClass(IntWritable.class);
  54.  
  55.         job.setOutputKeyClass(Text.class);
  56.         job.setOutputValueClass(IntWritable.class);
  57.  
  58.         job.setGroupingComparatorClass(CompositeKeyGroupComparator.class);
  59.  
  60.         job.setNumReduceTasks(1);
  61.  
  62.         if (job.waitForCompletion(true)) {
  63.             dumpSeqFile(new Path(outputPath, "part-r-00000"));
  64.  
  65.             return 0;
  66.         } else {
  67.             return 1;
  68.         }
  69.     }
  70.  
  71.     private void dumpSeqFile(Path path) throws IOException {
  72.         Configuration conf = getConf();
  73.         SequenceFile.Reader reader = new SequenceFile.Reader(
  74.                 FileSystem.get(conf), path, conf);
  75.         Text key = new Text();
  76.         IntWritable val = new IntWritable();
  77.         while (reader.next(key, val)) {
  78.             System.err.printf("%s\t%s\n", key, val);
  79.         }
  80.         reader.close();
  81.  
  82.     }
  83.  
  84.     /**
  85.      * Create a test file
  86.      *
  87.      * @param conf
  88.      * @return
  89.      * @throws IOException
  90.      */
  91.     private String createTestFile(Configuration conf) throws IOException {
  92.         FileSystem fs = FileSystem.get(conf);
  93.         String file = "krre-test.csv";
  94.         PrintStream ps = new PrintStream(fs.create(new Path(file), true));
  95.  
  96.         long now = System.currentTimeMillis();
  97.  
  98.         for (long ts = now - 60000; ts <= now; ts += 1000) {
  99.             ps.printf("%s,%d,%d\n", "A", ts, 1);
  100.             ps.printf("%s,%d,%d\n", "B", ts, 2);
  101.             ps.printf("%s,%d,%d\n", "C", ts, 3);
  102.         }
  103.  
  104.         ps.close();
  105.  
  106.         return file;
  107.     }
  108.  
  109.     /**
  110.      * Container for a string identifier and a timestamp
  111.      */
  112.     public static class CompositeKey implements
  113.             WritableComparable<CompositeKey> {
  114.         public Text identifier = new Text();
  115.         public long timestamp;
  116.  
  117.         @Override
  118.         public void readFields(DataInput in) throws IOException {
  119.             identifier.readFields(in);
  120.             timestamp = in.readLong();
  121.         }
  122.  
  123.         @Override
  124.         public void write(DataOutput out) throws IOException {
  125.             identifier.write(out);
  126.             out.writeLong(timestamp);
  127.         }
  128.  
  129.         @Override
  130.         public int compareTo(CompositeKey o) {
  131.             int c = identifier.compareTo(o.identifier);
  132.             if (c != 0) {
  133.                 return c;
  134.             }
  135.  
  136.             if (timestamp < o.timestamp) {
  137.                 return -1;
  138.             } else {
  139.                 return timestamp == o.timestamp ? 0 : 1;
  140.             }
  141.         }
  142.  
  143.         @Override
  144.         public int hashCode() {
  145.             // we want to partition on the identifier field only, so all records
  146.             // values for the same identifier end up in 1 reducer
  147.             return identifier.hashCode();
  148.         }
  149.  
  150.         @Override
  151.         public String toString() {
  152.             return String.format("%s,%d", identifier, timestamp);
  153.         }
  154.     }
  155.  
  156.     /**
  157.      * Key grouper which only considers the identifier component of the key
  158.      */
  159.     public static class CompositeKeyGroupComparator extends WritableComparator {
  160.         protected CompositeKeyGroupComparator() {
  161.             super(CompositeKey.class, true);
  162.         }
  163.  
  164.         @Override
  165.         public int compare(WritableComparable a, WritableComparable b) {
  166.             return ((CompositeKey) a).identifier
  167.                     .compareTo(((CompositeKey) b).identifier);
  168.         }
  169.     }
  170.  
  171.     public static class KrreMapper extends
  172.             Mapper<LongWritable, Text, CompositeKey, IntWritable> {
  173.         protected CompositeKey outputKey = new CompositeKey();
  174.         protected IntWritable outputValue = new IntWritable();
  175.  
  176.         @Override
  177.         protected void map(LongWritable key, Text value, Context context)
  178.                 throws IOException, InterruptedException {
  179.  
  180.             String tokens[] = value.toString().split(",");
  181.             outputKey.identifier.set(tokens[0]);
  182.             outputKey.timestamp = Long.parseLong(tokens[1]);
  183.             outputValue.set(Integer.parseInt(tokens[2]));
  184.  
  185.             context.write(outputKey, outputValue);
  186.         }
  187.     }
  188.  
  189.     /**
  190.      * Naive reducer which calculates the distance travelled for a given
  191.      * identifer from collection of timestamp and speed measurements
  192.      * <p>
  193.      * Obviously this implementation is flawed in it's distance calculation
  194.      */
  195.     public static class KrreReducer extends
  196.             Reducer<CompositeKey, IntWritable, Text, IntWritable> {
  197.         @Override
  198.         protected void reduce(CompositeKey key, Iterable<IntWritable> values,
  199.                 Context context) throws IOException, InterruptedException {
  200.             long prevTimestamp = 0;
  201.             int dist = 0;
  202.             for (IntWritable value : values) {
  203.                 if (prevTimestamp != 0) {
  204.                     // subtly the component values of the key change with each
  205.                     // iteration so we can track the delta in timestamps
  206.                     long timeDelta = key.timestamp - prevTimestamp;
  207.                     dist += timeDelta * value.get();
  208.                 }
  209.  
  210.                 prevTimestamp = key.timestamp;
  211.             }
  212.  
  213.             context.write(key.identifier, new IntWritable(dist));
  214.         }
  215.     }
  216.  
  217.     public static void main(String args[]) throws Exception {
  218.         System.exit(ToolRunner.run(new KeyReducerReuseExample(), args));
  219.     }
  220. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement