Pastebin launched a little side project called VERYVIRAL.com, check it out ;-) Want more features on Pastebin? Sign Up, it's FREE!
Guest

RowRangeFilter

By: a guest on Feb 20th, 2013  |  syntax: None  |  size: 9.30 KB  |  views: 118  |  expires: Never
download  |  raw  |  embed  |  report abuse  |  print
Text below is selected. Please press Ctrl+C to copy to your clipboard. (⌘+C on Mac)
  1. package com.cerner.kepler.filters;
  2.  
  3. import java.io.DataInput;
  4. import java.io.DataOutput;
  5. import java.io.IOException;
  6. import java.io.Serializable;
  7. import java.util.Arrays;
  8. import java.util.Comparator;
  9. import java.util.Iterator;
  10. import java.util.Set;
  11. import java.util.TreeSet;
  12.  
  13. import org.apache.hadoop.hbase.KeyValue;
  14. import org.apache.hadoop.hbase.filter.FilterBase;
  15. import org.apache.hadoop.hbase.util.Bytes;
  16. import org.apache.hadoop.hbase.util.Pair;
  17. import org.slf4j.Logger;
  18. import org.slf4j.LoggerFactory;
  19.  
  20. public class RowRangeFilter extends FilterBase {
  21.     // Go go Row Ranger, the might filterin' row ranger!
  22.  
  23.     /**
  24.      * Logger
  25.      */
  26.     private static final Logger LOG = LoggerFactory.getLogger(RowRangeFilter.class);
  27.  
  28.     /**
  29.      * Boolean to indicate if we have loaded our scan into the first range
  30.      */
  31.     private boolean loaded = false;
  32.  
  33.     /**
  34.      * Boolean to indicate if we should abort the scan (true=abort)
  35.      */
  36.     private boolean filterAllRemaining = false;
  37.  
  38.     /**
  39.      * An object which determines what our filter does
  40.      */
  41.     private ReturnCode returnCode;
  42.  
  43.     /**
  44.      * The current byte[] pair that represents our row key range we are in
  45.      */
  46.     private Pair<byte[], byte[]> currentRange;
  47.  
  48.     /**
  49.      * A sorted iterator contained our row key ranges
  50.      */
  51.     private Iterator<Pair<byte[], byte[]>> rowKeyIterator;
  52.  
  53.     private int iteratorCount;
  54.    
  55.     /**
  56.      * Constructor used by HBase. DO NOT USE!
  57.      */
  58.     public RowRangeFilter() {
  59.         super();
  60.         LOG.debug("Creating new filter server side");
  61.     }
  62.  
  63.     /**
  64.      * Constructs the filter from a set of row key ranges represented by {@link Pair}s of byte[]'s.
  65.      *
  66.      * @param ranges
  67.      *            the set of row key ranges we want to filter on
  68.      * @throws IllegalArgumentException
  69.      *             if ranges is null or empty, or if any range's starting value is < then its end, or if any range overlaps with
  70.      *             another range
  71.      */
  72.     public RowRangeFilter(Set<Pair<byte[], byte[]>> ranges) {
  73.  
  74.         if (ranges == null)
  75.             throw new IllegalArgumentException("ranges cannot be null");
  76.         if (ranges.isEmpty())
  77.             throw new IllegalArgumentException("ranges must contain at least 1 pair");
  78.  
  79.         // By moving our data to a TreeSet we sort our data into the correct order before handing it off to hbase-server
  80.         TreeSet<Pair<byte[], byte[]>> treeSet = new TreeSet<Pair<byte[], byte[]>>(
  81.                 new FirstValuePairByteComparator());
  82.  
  83.         // TODO: Do we combine or throw exception or even ignore?
  84.  
  85.         Iterator<Pair<byte[], byte[]>> iterator = ranges.iterator();
  86.         while (iterator.hasNext()) {
  87.             Pair<byte[], byte[]> range = iterator.next();
  88.  
  89.             // Verify that each range is valid (i.e. start <= end)
  90.             if (Bytes.compareTo(range.getFirst(), range.getSecond()) > 0)
  91.                 throw new IllegalArgumentException(
  92.                         "The following range is invalid since its starting value is less then its end : "
  93.                                 + bytePairToString(range));
  94.  
  95.             // Verify that this new range does not overlap any existing ranges
  96.             for (Pair<byte[], byte[]> addedRange : treeSet) {
  97.                 // Verify that range's start point is not contained in addedRange
  98.                 if (Bytes.compareTo(range.getFirst(), addedRange.getFirst()) >= 0
  99.                         && Bytes.compareTo(range.getFirst(), addedRange.getSecond()) < 0)
  100.                     throw new IllegalArgumentException("The range : " + bytePairToString(range)
  101.                             + " overlaps an existing range : " + treeSet.remove(range));
  102.  
  103.                 // Verify that range's end point is not contained in addedRange
  104.                 if (Bytes.compareTo(range.getSecond(), addedRange.getFirst()) > 0
  105.                         && Bytes.compareTo(range.getSecond(), addedRange.getSecond()) <= 0)
  106.                     throw new IllegalArgumentException("The range : " + bytePairToString(range)
  107.                             + " overlaps an existing range : " + bytePairToString(addedRange));
  108.             }
  109.  
  110.             treeSet.add(range);
  111.         }
  112.         rowKeyIterator = treeSet.iterator();
  113.         iteratorCount = treeSet.size();
  114.     }
  115.  
  116.     @Override
  117.     public ReturnCode filterKeyValue(KeyValue ignored) {
  118.         return returnCode;
  119.     }
  120.  
  121.     @Override
  122.     public boolean filterAllRemaining() {
  123.         return filterAllRemaining;
  124.     }
  125.  
  126.     @Override
  127.     public KeyValue getNextKeyHint(KeyValue kv) {
  128.         return KeyValue.createFirstOnRow(currentRange.getFirst());
  129.     }
  130.  
  131.     @Override
  132.     public boolean filterRowKey(byte[] buffer, int offset, int length) {
  133.  
  134.         byte[] rowKey = Arrays.copyOfRange(buffer, offset, offset + length);
  135.  
  136.         // If this is the first time filtering seek to the appropriate range
  137.         if (!loaded) {
  138.             LOG.debug("Loading currentRange");
  139.             while (rowKeyIterator.hasNext()) {
  140.                 currentRange = rowKeyIterator.next();
  141.                 if (Bytes.compareTo(rowKey, currentRange.getSecond()) < 0) {
  142.                     break;
  143.                 }
  144.             }
  145.  
  146.             loaded = true;
  147.  
  148.             if (Bytes.compareTo(rowKey, currentRange.getFirst()) < 0) {
  149.                 if (LOG.isDebugEnabled())
  150.                     LOG.debug(
  151.                             "Current rowKey: {} is not in first range : {}, seeking to start of range",
  152.                             Bytes.toStringBinary(rowKey), bytePairToString(currentRange));
  153.                 returnCode = ReturnCode.SEEK_NEXT_USING_HINT;
  154.                 return true;
  155.             }
  156.         }
  157.  
  158.         // Verify that the rowKey is still within our range
  159.         if (Bytes.compareTo(rowKey, currentRange.getSecond()) < 0) {
  160.             if (LOG.isDebugEnabled())
  161.                 LOG.debug("Current rowKey : {} is contained within range : {}",
  162.                         Bytes.toStringBinary(rowKey), bytePairToString(currentRange));
  163.             returnCode = ReturnCode.INCLUDE;
  164.             return false;
  165.         }
  166.  
  167.         if (rowKeyIterator.hasNext()) {
  168.             if (LOG.isDebugEnabled())
  169.                 LOG.debug(
  170.                         "Current rowKey : {} is not contained by range : {}, seeking to next range",
  171.                         Bytes.toStringBinary(rowKey), bytePairToString(currentRange));
  172.             // Seek to the next range
  173.             currentRange = rowKeyIterator.next();
  174.             returnCode = ReturnCode.SEEK_NEXT_USING_HINT;
  175.             return true;
  176.         }
  177.  
  178.         if (LOG.isDebugEnabled())
  179.             LOG.debug("Current rowKey : {} is not contained within any ranges, aborting scan",
  180.                     Bytes.toStringBinary(rowKey));
  181.  
  182.         // We don't have another range to seek to so abort the scan
  183.         filterAllRemaining = true;
  184.         // Not sure if I have to set this here
  185.         returnCode = ReturnCode.NEXT_ROW;
  186.         return true;
  187.     }
  188.  
  189.     public void write(DataOutput out) throws IOException {
  190.         out.writeInt(iteratorCount);
  191.         while (rowKeyIterator.hasNext()) {
  192.             Pair<byte[], byte[]> range = rowKeyIterator.next();
  193.  
  194.             byte[] first = range.getFirst();
  195.             out.writeInt(first.length);
  196.             out.write(first);
  197.  
  198.             byte[] second = range.getSecond();
  199.             out.writeInt(second.length);
  200.             out.write(second);
  201.         }
  202.     }
  203.  
  204.     public void readFields(DataInput in) throws IOException {
  205.         TreeSet<Pair<byte[], byte[]>> ranges = new TreeSet<Pair<byte[], byte[]>>(
  206.                 new FirstValuePairByteComparator());
  207.  
  208.         LOG.debug("Reading fields");
  209.        
  210.         // Load ranges into treeset so we can create a sorted iterator
  211.         int totalRanges = in.readInt();
  212.        
  213.         for(int i=0; i<totalRanges; i++){
  214.             byte[] first = new byte[in.readInt()];
  215.             in.readFully(first);
  216.             byte[] second = new byte[in.readInt()];
  217.             in.readFully(second);
  218.  
  219.             ranges.add(Pair.newPair(first, second));
  220.         }
  221.    
  222.         rowKeyIterator = ranges.iterator();
  223.        
  224.         if (LOG.isDebugEnabled())
  225.             LOG.debug("Finished reading fields. Loaded : {} ranges", ranges.size());
  226.     }
  227.  
  228.     private static class FirstValuePairByteComparator implements Comparator<Pair<byte[], byte[]>>, Serializable {
  229.         /**
  230.          * Generated serial version
  231.          */
  232.         private static final long serialVersionUID = 8426766119633439881L;
  233.  
  234.         public int compare(Pair<byte[], byte[]> pair1, Pair<byte[], byte[]> pair2) {
  235.             return Bytes.compareTo(pair1.getFirst(), pair2.getFirst());
  236.         }
  237.     }
  238.  
  239.     /**
  240.      * Simple function used in logging/exceptions to write out a range as a string
  241.      *
  242.      * @param pair
  243.      *            the row key range
  244.      * @return the string representing the row key range
  245.      */
  246.     private static String bytePairToString(Pair<byte[], byte[]> pair) {
  247.         StringBuilder builder = new StringBuilder();
  248.         builder.append("[");
  249.         builder.append(Bytes.toStringBinary(pair.getFirst()));
  250.         builder.append(", ");
  251.         builder.append(Bytes.toStringBinary(pair.getSecond()));
  252.         builder.append("]");
  253.         return builder.toString();
  254.     }
  255. }