Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.cerner.kepler.filters;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- import java.io.Serializable;
- import java.util.Arrays;
- import java.util.Comparator;
- import java.util.Iterator;
- import java.util.Set;
- import java.util.TreeSet;
- import org.apache.hadoop.hbase.KeyValue;
- import org.apache.hadoop.hbase.filter.FilterBase;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.hadoop.hbase.util.Pair;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public class RowRangeFilter extends FilterBase {
- /**
- * Logger
- */
- private static final Logger LOG = LoggerFactory.getLogger(RowRangeFilter.class);
- /**
- * Boolean to indicate if we have loaded our scan into the first range
- */
- private boolean loaded = false;
- /**
- * Boolean to indicate if we should abort the scan (true=abort)
- */
- private boolean filterAllRemaining = false;
- /**
- * An object which determines what our filter does
- */
- private ReturnCode returnCode;
- /**
- * The current byte[] pair that represents our row key range we are in
- */
- private Pair<byte[], byte[]> currentRange;
- /**
- * A sorted iterator contained our row key ranges
- */
- private Iterator<Pair<byte[], byte[]>> rowKeyIterator;
- private int iteratorCount;
- /**
- * Constructor used by HBase. DO NOT USE!
- */
- public RowRangeFilter() {
- super();
- LOG.debug("Creating new filter server side");
- }
- /**
- * Constructs the filter from a set of row key ranges represented by {@link Pair}s of byte[]'s.
- *
- * @param ranges
- * the set of row key ranges we want to filter on
- * @throws IllegalArgumentException
- * if ranges is null or empty, or if any range's starting value is < then its end, or if any range overlaps with
- * another range
- */
- public RowRangeFilter(Set<Pair<byte[], byte[]>> ranges) {
- if (ranges == null)
- throw new IllegalArgumentException("ranges cannot be null");
- if (ranges.isEmpty())
- throw new IllegalArgumentException("ranges must contain at least 1 pair");
- // By moving our data to a TreeSet we sort our data into the correct order before handing it off to hbase-server
- TreeSet<Pair<byte[], byte[]>> treeSet = new TreeSet<Pair<byte[], byte[]>>(
- new FirstValuePairByteComparator());
- // TODO: Do we combine or throw exception or even ignore?
- Iterator<Pair<byte[], byte[]>> iterator = ranges.iterator();
- while (iterator.hasNext()) {
- Pair<byte[], byte[]> range = iterator.next();
- // Verify that each range is valid (i.e. start <= end)
- if (Bytes.compareTo(range.getFirst(), range.getSecond()) > 0)
- throw new IllegalArgumentException(
- "The following range is invalid since its starting value is less then its end : "
- + bytePairToString(range));
- // Verify that this new range does not overlap any existing ranges
- for (Pair<byte[], byte[]> addedRange : treeSet) {
- // Verify that range's start point is not contained in addedRange
- if (Bytes.compareTo(range.getFirst(), addedRange.getFirst()) >= 0
- && Bytes.compareTo(range.getFirst(), addedRange.getSecond()) < 0)
- throw new IllegalArgumentException("The range : " + bytePairToString(range)
- + " overlaps an existing range : " + treeSet.remove(range));
- // Verify that range's end point is not contained in addedRange
- if (Bytes.compareTo(range.getSecond(), addedRange.getFirst()) > 0
- && Bytes.compareTo(range.getSecond(), addedRange.getSecond()) <= 0)
- throw new IllegalArgumentException("The range : " + bytePairToString(range)
- + " overlaps an existing range : " + bytePairToString(addedRange));
- }
- treeSet.add(range);
- }
- rowKeyIterator = treeSet.iterator();
- iteratorCount = treeSet.size();
- }
- @Override
- public ReturnCode filterKeyValue(KeyValue ignored) {
- return returnCode;
- }
- @Override
- public boolean filterAllRemaining() {
- return filterAllRemaining;
- }
- @Override
- public KeyValue getNextKeyHint(KeyValue kv) {
- return KeyValue.createFirstOnRow(currentRange.getFirst());
- }
- @Override
- public boolean filterRowKey(byte[] buffer, int offset, int length) {
- byte[] rowKey = Arrays.copyOfRange(buffer, offset, offset + length);
- // If this is the first time filtering seek to the appropriate range
- if (!loaded) {
- LOG.debug("Loading currentRange");
- while (rowKeyIterator.hasNext()) {
- currentRange = rowKeyIterator.next();
- if (Bytes.compareTo(rowKey, currentRange.getSecond()) < 0) {
- break;
- }
- }
- loaded = true;
- if (Bytes.compareTo(rowKey, currentRange.getFirst()) < 0) {
- if (LOG.isDebugEnabled())
- LOG.debug(
- "Current rowKey: {} is not in first range : {}, seeking to start of range",
- Bytes.toStringBinary(rowKey), bytePairToString(currentRange));
- returnCode = ReturnCode.SEEK_NEXT_USING_HINT;
- return true;
- }
- }
- // Verify that the rowKey is still within our range
- if (Bytes.compareTo(rowKey, currentRange.getSecond()) < 0) {
- if (LOG.isDebugEnabled())
- LOG.debug("Current rowKey : {} is contained within range : {}",
- Bytes.toStringBinary(rowKey), bytePairToString(currentRange));
- returnCode = ReturnCode.INCLUDE;
- return false;
- }
- if (rowKeyIterator.hasNext()) {
- if (LOG.isDebugEnabled())
- LOG.debug(
- "Current rowKey : {} is not contained by range : {}, seeking to next range",
- Bytes.toStringBinary(rowKey), bytePairToString(currentRange));
- // Seek to the next range
- currentRange = rowKeyIterator.next();
- returnCode = ReturnCode.SEEK_NEXT_USING_HINT;
- return true;
- }
- if (LOG.isDebugEnabled())
- LOG.debug("Current rowKey : {} is not contained within any ranges, aborting scan",
- Bytes.toStringBinary(rowKey));
- // We don't have another range to seek to so abort the scan
- filterAllRemaining = true;
- // Not sure if I have to set this here
- returnCode = ReturnCode.NEXT_ROW;
- return true;
- }
- public void write(DataOutput out) throws IOException {
- LOG.debug("Writing out filter data");
- out.writeInt(iteratorCount);
- LOG.debug("iteratorCount : {}", iteratorCount);
- while (rowKeyIterator.hasNext()) {
- LOG.debug("Writing range");
- Pair<byte[], byte[]> range = rowKeyIterator.next();
- Bytes.writeByteArray(out, range.getFirst());
- Bytes.writeByteArray(out, range.getSecond());
- }
- }
- public void readFields(DataInput in) throws IOException {
- TreeSet<Pair<byte[], byte[]>> ranges = new TreeSet<Pair<byte[], byte[]>>(
- new FirstValuePairByteComparator());
- LOG.debug("Reading fields");
- // Load ranges into treeset so we can create a sorted iterator
- int totalRanges = in.readInt();
- LOG.debug("Read ranges {}", totalRanges);
- for(int i=0; i<totalRanges; i++){
- LOG.debug("Reading range #{}", i+1);
- byte[] first = Bytes.readByteArray(in);
- LOG.debug("Read first range with length {}", first.length);
- byte[] second = Bytes.readByteArray(in);
- LOG.debug("Read second range with length {}", second.length);
- Pair<byte[], byte[]> range = Pair.newPair(first, second);
- LOG.debug("Read in range {}", bytePairToString(range));
- ranges.add(range);
- }
- rowKeyIterator = ranges.iterator();
- if (LOG.isDebugEnabled())
- LOG.debug("Finished reading fields. Loaded : {} ranges", ranges.size());
- }
- private static class FirstValuePairByteComparator implements Comparator<Pair<byte[], byte[]>>, Serializable {
- /**
- * Generated serial version
- */
- private static final long serialVersionUID = 8426766119633439881L;
- public int compare(Pair<byte[], byte[]> pair1, Pair<byte[], byte[]> pair2) {
- return Bytes.compareTo(pair1.getFirst(), pair2.getFirst());
- }
- }
- /**
- * Simple function used in logging/exceptions to write out a range as a string
- *
- * @param pair
- * the row key range
- * @return the string representing the row key range
- */
- private static String bytePairToString(Pair<byte[], byte[]> pair) {
- StringBuilder builder = new StringBuilder();
- builder.append("[");
- builder.append(Bytes.toStringBinary(pair.getFirst()));
- builder.append(", ");
- builder.append(Bytes.toStringBinary(pair.getSecond()));
- builder.append("]");
- return builder.toString();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement