package com.cerner.kepler.filters; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.Serializable; import java.util.Arrays; import java.util.Comparator; import java.util.Iterator; import java.util.Set; import java.util.TreeSet; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class RowRangeFilter extends FilterBase { /** * Logger */ private static final Logger LOG = LoggerFactory.getLogger(RowRangeFilter.class); /** * Boolean to indicate if we have loaded our scan into the first range */ private boolean loaded = false; /** * Boolean to indicate if we should abort the scan (true=abort) */ private boolean filterAllRemaining = false; /** * An object which determines what our filter does */ private ReturnCode returnCode; /** * The current byte[] pair that represents our row key range we are in */ private Pair currentRange; /** * A sorted iterator contained our row key ranges */ private Iterator> rowKeyIterator; private int iteratorCount; /** * Constructor used by HBase. DO NOT USE! */ public RowRangeFilter() { super(); LOG.debug("Creating new filter server side"); } /** * Constructs the filter from a set of row key ranges represented by {@link Pair}s of byte[]'s. * * @param ranges * the set of row key ranges we want to filter on * @throws IllegalArgumentException * if ranges is null or empty, or if any range's starting value is < then its end, or if any range overlaps with * another range */ public RowRangeFilter(Set> ranges) { if (ranges == null) throw new IllegalArgumentException("ranges cannot be null"); if (ranges.isEmpty()) throw new IllegalArgumentException("ranges must contain at least 1 pair"); // By moving our data to a TreeSet we sort our data into the correct order before handing it off to hbase-server TreeSet> treeSet = new TreeSet>( new FirstValuePairByteComparator()); // TODO: Do we combine or throw exception or even ignore? Iterator> iterator = ranges.iterator(); while (iterator.hasNext()) { Pair range = iterator.next(); // Verify that each range is valid (i.e. start <= end) if (Bytes.compareTo(range.getFirst(), range.getSecond()) > 0) throw new IllegalArgumentException( "The following range is invalid since its starting value is less then its end : " + bytePairToString(range)); // Verify that this new range does not overlap any existing ranges for (Pair addedRange : treeSet) { // Verify that range's start point is not contained in addedRange if (Bytes.compareTo(range.getFirst(), addedRange.getFirst()) >= 0 && Bytes.compareTo(range.getFirst(), addedRange.getSecond()) < 0) throw new IllegalArgumentException("The range : " + bytePairToString(range) + " overlaps an existing range : " + treeSet.remove(range)); // Verify that range's end point is not contained in addedRange if (Bytes.compareTo(range.getSecond(), addedRange.getFirst()) > 0 && Bytes.compareTo(range.getSecond(), addedRange.getSecond()) <= 0) throw new IllegalArgumentException("The range : " + bytePairToString(range) + " overlaps an existing range : " + bytePairToString(addedRange)); } treeSet.add(range); } rowKeyIterator = treeSet.iterator(); iteratorCount = treeSet.size(); } @Override public ReturnCode filterKeyValue(KeyValue ignored) { return returnCode; } @Override public boolean filterAllRemaining() { return filterAllRemaining; } @Override public KeyValue getNextKeyHint(KeyValue kv) { return KeyValue.createFirstOnRow(currentRange.getFirst()); } @Override public boolean filterRowKey(byte[] buffer, int offset, int length) { byte[] rowKey = Arrays.copyOfRange(buffer, offset, offset + length); // If this is the first time filtering seek to the appropriate range if (!loaded) { LOG.debug("Loading currentRange"); while (rowKeyIterator.hasNext()) { currentRange = rowKeyIterator.next(); if (Bytes.compareTo(rowKey, currentRange.getSecond()) < 0) { break; } } loaded = true; if (Bytes.compareTo(rowKey, currentRange.getFirst()) < 0) { if (LOG.isDebugEnabled()) LOG.debug( "Current rowKey: {} is not in first range : {}, seeking to start of range", Bytes.toStringBinary(rowKey), bytePairToString(currentRange)); returnCode = ReturnCode.SEEK_NEXT_USING_HINT; return true; } } // Verify that the rowKey is still within our range if (Bytes.compareTo(rowKey, currentRange.getSecond()) < 0) { if (LOG.isDebugEnabled()) LOG.debug("Current rowKey : {} is contained within range : {}", Bytes.toStringBinary(rowKey), bytePairToString(currentRange)); returnCode = ReturnCode.INCLUDE; return false; } if (rowKeyIterator.hasNext()) { if (LOG.isDebugEnabled()) LOG.debug( "Current rowKey : {} is not contained by range : {}, seeking to next range", Bytes.toStringBinary(rowKey), bytePairToString(currentRange)); // Seek to the next range currentRange = rowKeyIterator.next(); returnCode = ReturnCode.SEEK_NEXT_USING_HINT; return true; } if (LOG.isDebugEnabled()) LOG.debug("Current rowKey : {} is not contained within any ranges, aborting scan", Bytes.toStringBinary(rowKey)); // We don't have another range to seek to so abort the scan filterAllRemaining = true; // Not sure if I have to set this here returnCode = ReturnCode.NEXT_ROW; return true; } public void write(DataOutput out) throws IOException { LOG.debug("Writing out filter data"); out.writeInt(iteratorCount); LOG.debug("iteratorCount : {}", iteratorCount); while (rowKeyIterator.hasNext()) { LOG.debug("Writing range"); Pair range = rowKeyIterator.next(); Bytes.writeByteArray(out, range.getFirst()); Bytes.writeByteArray(out, range.getSecond()); } } public void readFields(DataInput in) throws IOException { TreeSet> ranges = new TreeSet>( new FirstValuePairByteComparator()); LOG.debug("Reading fields"); // Load ranges into treeset so we can create a sorted iterator int totalRanges = in.readInt(); LOG.debug("Read ranges {}", totalRanges); for(int i=0; i range = Pair.newPair(first, second); LOG.debug("Read in range {}", bytePairToString(range)); ranges.add(range); } rowKeyIterator = ranges.iterator(); if (LOG.isDebugEnabled()) LOG.debug("Finished reading fields. Loaded : {} ranges", ranges.size()); } private static class FirstValuePairByteComparator implements Comparator>, Serializable { /** * Generated serial version */ private static final long serialVersionUID = 8426766119633439881L; public int compare(Pair pair1, Pair pair2) { return Bytes.compareTo(pair1.getFirst(), pair2.getFirst()); } } /** * Simple function used in logging/exceptions to write out a range as a string * * @param pair * the row key range * @return the string representing the row key range */ private static String bytePairToString(Pair pair) { StringBuilder builder = new StringBuilder(); builder.append("["); builder.append(Bytes.toStringBinary(pair.getFirst())); builder.append(", "); builder.append(Bytes.toStringBinary(pair.getSecond())); builder.append("]"); return builder.toString(); } }