Advertisement
Guest User

HFileInputFormat

a guest
Feb 9th, 2012
293
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 2.51 KB | None | 0 0
  1. package org.gbif.hbase.mapreduce;
  2.  
  3. import java.io.IOException;
  4.  
  5. import org.apache.hadoop.fs.FileSystem;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.hbase.KeyValue;
  8. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  9. import org.apache.hadoop.hbase.io.hfile.HFile;
  10. import org.apache.hadoop.hbase.io.hfile.HFileScanner;
  11. import org.apache.hadoop.mapred.FileInputFormat;
  12. import org.apache.hadoop.mapred.FileSplit;
  13. import org.apache.hadoop.mapred.InputSplit;
  14. import org.apache.hadoop.mapred.JobConf;
  15. import org.apache.hadoop.mapred.RecordReader;
  16. import org.apache.hadoop.mapred.Reporter;
  17.  
  18.  
  19. /**
  20.  * This is direct port (hopefully) of the Scala version of this class available on
  21.  * https://gist.github.com/1120311
  22.  *
  23.  * @author tim
  24.  */
  25. public class HFileInputFormat extends FileInputFormat<ImmutableBytesWritable, KeyValue> {
  26.  
  27.   private class HFileRecordReader implements RecordReader<ImmutableBytesWritable, KeyValue> {
  28.  
  29.     private HFile.Reader reader;
  30.     private final HFileScanner scanner;
  31.     private int entryNumber;
  32.  
  33.     public HFileRecordReader(FileSplit split, JobConf conf) throws IOException {
  34.       final Path path = split.getPath();
  35.       HFile.Reader reader = new HFile.Reader(FileSystem.get(conf), path, null, false);
  36.       scanner = reader.getScanner(false, false);
  37.       reader.loadFileInfo(); // This is required or else seekTo throws a NPE
  38.       scanner.seekTo(); // This is required or else scanner.next throws an error
  39.     }
  40.  
  41.     @Override
  42.     public void close() throws IOException {
  43.       if (reader != null) {
  44.         reader.close();
  45.       }
  46.     }
  47.  
  48.     @Override
  49.     public ImmutableBytesWritable createKey() {
  50.       return new ImmutableBytesWritable(scanner.getKeyValue().getRow());
  51.     }
  52.  
  53.     @Override
  54.     public KeyValue createValue() {
  55.       return scanner.getKeyValue();
  56.     }
  57.  
  58.     @Override
  59.     public long getPos() throws IOException {
  60.       return entryNumber;
  61.     }
  62.  
  63.     @Override
  64.     public float getProgress() throws IOException {
  65.       if (reader != null) {
  66.         return (entryNumber / reader.getEntries());
  67.       }
  68.       return 1;
  69.     }
  70.  
  71.     @Override
  72.     public boolean next(ImmutableBytesWritable key, KeyValue value) throws IOException {
  73.       entryNumber++;
  74.       return scanner.next();
  75.     }
  76.  
  77.   }
  78.  
  79.   @Override
  80.   public RecordReader<ImmutableBytesWritable, KeyValue> getRecordReader(InputSplit split, JobConf conf,
  81.     Reporter reporter) throws IOException {
  82.     return new HFileRecordReader((FileSplit) split, conf);
  83.   }
  84. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement