package com.cerner.kepler.filters;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RowRangeFilter extends FilterBase {
// Go go Row Ranger, the might filterin' row ranger!
/**
* Logger
*/
private static final Logger LOG = LoggerFactory.getLogger(RowRangeFilter.class);
/**
* Boolean to indicate if we have loaded our scan into the first range
*/
private boolean loaded = false;
/**
* Boolean to indicate if we should abort the scan (true=abort)
*/
private boolean filterAllRemaining = false;
/**
* An object which determines what our filter does
*/
private ReturnCode returnCode;
/**
* The current byte[] pair that represents our row key range we are in
*/
private Pair<byte[], byte[]> currentRange;
/**
* A sorted iterator contained our row key ranges
*/
private Iterator<Pair<byte[], byte[]>> rowKeyIterator;
private int iteratorCount;
/**
* Constructor used by HBase. DO NOT USE!
*/
public RowRangeFilter() {
super();
LOG.debug("Creating new filter server side");
}
/**
* Constructs the filter from a set of row key ranges represented by {@link Pair}s of byte[]'s.
*
* @param ranges
* the set of row key ranges we want to filter on
* @throws IllegalArgumentException
* if ranges is null or empty, or if any range's starting value is < then its end, or if any range overlaps with
* another range
*/
public RowRangeFilter(Set<Pair<byte[], byte[]>> ranges) {
if (ranges == null)
throw new IllegalArgumentException("ranges cannot be null");
if (ranges.isEmpty())
throw new IllegalArgumentException("ranges must contain at least 1 pair");
// By moving our data to a TreeSet we sort our data into the correct order before handing it off to hbase-server
TreeSet<Pair<byte[], byte[]>> treeSet = new TreeSet<Pair<byte[], byte[]>>(
new FirstValuePairByteComparator());
// TODO: Do we combine or throw exception or even ignore?
Iterator<Pair<byte[], byte[]>> iterator = ranges.iterator();
while (iterator.hasNext()) {
Pair<byte[], byte[]> range = iterator.next();
// Verify that each range is valid (i.e. start <= end)
if (Bytes.compareTo(range.getFirst(), range.getSecond()) > 0)
throw new IllegalArgumentException(
"The following range is invalid since its starting value is less then its end : "
+ bytePairToString(range));
// Verify that this new range does not overlap any existing ranges
for (Pair<byte[], byte[]> addedRange : treeSet) {
// Verify that range's start point is not contained in addedRange
if (Bytes.compareTo(range.getFirst(), addedRange.getFirst()) >= 0
&& Bytes.compareTo(range.getFirst(), addedRange.getSecond()) < 0)
throw new IllegalArgumentException("The range : " + bytePairToString(range)
+ " overlaps an existing range : " + treeSet.remove(range));
// Verify that range's end point is not contained in addedRange
if (Bytes.compareTo(range.getSecond(), addedRange.getFirst()) > 0
&& Bytes.compareTo(range.getSecond(), addedRange.getSecond()) <= 0)
throw new IllegalArgumentException("The range : " + bytePairToString(range)
+ " overlaps an existing range : " + bytePairToString(addedRange));
}
treeSet.add(range);
}
rowKeyIterator = treeSet.iterator();
iteratorCount = treeSet.size();
}
@Override
public ReturnCode filterKeyValue(KeyValue ignored) {
return returnCode;
}
@Override
public boolean filterAllRemaining() {
return filterAllRemaining;
}
@Override
public KeyValue getNextKeyHint(KeyValue kv) {
return KeyValue.createFirstOnRow(currentRange.getFirst());
}
@Override
public boolean filterRowKey(byte[] buffer, int offset, int length) {
byte[] rowKey = Arrays.copyOfRange(buffer, offset, offset + length);
// If this is the first time filtering seek to the appropriate range
if (!loaded) {
LOG.debug("Loading currentRange");
while (rowKeyIterator.hasNext()) {
currentRange = rowKeyIterator.next();
if (Bytes.compareTo(rowKey, currentRange.getSecond()) < 0) {
break;
}
}
loaded = true;
if (Bytes.compareTo(rowKey, currentRange.getFirst()) < 0) {
if (LOG.isDebugEnabled())
LOG.debug(
"Current rowKey: {} is not in first range : {}, seeking to start of range",
Bytes.toStringBinary(rowKey), bytePairToString(currentRange));
returnCode = ReturnCode.SEEK_NEXT_USING_HINT;
return true;
}
}
// Verify that the rowKey is still within our range
if (Bytes.compareTo(rowKey, currentRange.getSecond()) < 0) {
if (LOG.isDebugEnabled())
LOG.debug("Current rowKey : {} is contained within range : {}",
Bytes.toStringBinary(rowKey), bytePairToString(currentRange));
returnCode = ReturnCode.INCLUDE;
return false;
}
if (rowKeyIterator.hasNext()) {
if (LOG.isDebugEnabled())
LOG.debug(
"Current rowKey : {} is not contained by range : {}, seeking to next range",
Bytes.toStringBinary(rowKey), bytePairToString(currentRange));
// Seek to the next range
currentRange = rowKeyIterator.next();
returnCode = ReturnCode.SEEK_NEXT_USING_HINT;
return true;
}
if (LOG.isDebugEnabled())
LOG.debug("Current rowKey : {} is not contained within any ranges, aborting scan",
Bytes.toStringBinary(rowKey));
// We don't have another range to seek to so abort the scan
filterAllRemaining = true;
// Not sure if I have to set this here
returnCode = ReturnCode.NEXT_ROW;
return true;
}
public void write(DataOutput out) throws IOException {
out.writeInt(iteratorCount);
while (rowKeyIterator.hasNext()) {
Pair<byte[], byte[]> range = rowKeyIterator.next();
byte[] first = range.getFirst();
out.writeInt(first.length);
out.write(first);
byte[] second = range.getSecond();
out.writeInt(second.length);
out.write(second);
}
}
public void readFields(DataInput in) throws IOException {
TreeSet<Pair<byte[], byte[]>> ranges = new TreeSet<Pair<byte[], byte[]>>(
new FirstValuePairByteComparator());
LOG.debug("Reading fields");
// Load ranges into treeset so we can create a sorted iterator
int totalRanges = in.readInt();
for(int i=0; i<totalRanges; i++){
byte[] first = new byte[in.readInt()];
in.readFully(first);
byte[] second = new byte[in.readInt()];
in.readFully(second);
ranges.add(Pair.newPair(first, second));
}
rowKeyIterator = ranges.iterator();
if (LOG.isDebugEnabled())
LOG.debug("Finished reading fields. Loaded : {} ranges", ranges.size());
}
private static class FirstValuePairByteComparator implements Comparator<Pair<byte[], byte[]>>, Serializable {
/**
* Generated serial version
*/
private static final long serialVersionUID = 8426766119633439881L;
public int compare(Pair<byte[], byte[]> pair1, Pair<byte[], byte[]> pair2) {
return Bytes.compareTo(pair1.getFirst(), pair2.getFirst());
}
}
/**
* Simple function used in logging/exceptions to write out a range as a string
*
* @param pair
* the row key range
* @return the string representing the row key range
*/
private static String bytePairToString(Pair<byte[], byte[]> pair) {
StringBuilder builder = new StringBuilder();
builder.append("[");
builder.append(Bytes.toStringBinary(pair.getFirst()));
builder.append(", ");
builder.append(Bytes.toStringBinary(pair.getSecond()));
builder.append("]");
return builder.toString();
}
}