Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package org.gbif.hbase.mapreduce;
- import java.io.IOException;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.hbase.KeyValue;
- import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
- import org.apache.hadoop.hbase.io.hfile.HFile;
- import org.apache.hadoop.hbase.io.hfile.HFileScanner;
- import org.apache.hadoop.mapred.FileInputFormat;
- import org.apache.hadoop.mapred.FileSplit;
- import org.apache.hadoop.mapred.InputSplit;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapred.RecordReader;
- import org.apache.hadoop.mapred.Reporter;
- /**
- * This is direct port (hopefully) of the Scala version of this class available on
- * https://gist.github.com/1120311
- *
- * @author tim
- */
- public class HFileInputFormat extends FileInputFormat<ImmutableBytesWritable, KeyValue> {
- private class HFileRecordReader implements RecordReader<ImmutableBytesWritable, KeyValue> {
- private HFile.Reader reader;
- private final HFileScanner scanner;
- private int entryNumber;
- public HFileRecordReader(FileSplit split, JobConf conf) throws IOException {
- final Path path = split.getPath();
- HFile.Reader reader = new HFile.Reader(FileSystem.get(conf), path, null, false);
- scanner = reader.getScanner(false, false);
- reader.loadFileInfo(); // This is required or else seekTo throws a NPE
- scanner.seekTo(); // This is required or else scanner.next throws an error
- }
- @Override
- public void close() throws IOException {
- if (reader != null) {
- reader.close();
- }
- }
- @Override
- public ImmutableBytesWritable createKey() {
- return new ImmutableBytesWritable(scanner.getKeyValue().getRow());
- }
- @Override
- public KeyValue createValue() {
- return scanner.getKeyValue();
- }
- @Override
- public long getPos() throws IOException {
- return entryNumber;
- }
- @Override
- public float getProgress() throws IOException {
- if (reader != null) {
- return (entryNumber / reader.getEntries());
- }
- return 1;
- }
- @Override
- public boolean next(ImmutableBytesWritable key, KeyValue value) throws IOException {
- entryNumber++;
- return scanner.next();
- }
- }
- @Override
- public RecordReader<ImmutableBytesWritable, KeyValue> getRecordReader(InputSplit split, JobConf conf,
- Reporter reporter) throws IOException {
- return new HFileRecordReader((FileSplit) split, conf);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement