Pastebin launched a little side project called HostCabi.net, check it out ;-)Don't like ads? PRO users don't see any ads ;-)
Guest

Untitled

By: a guest on Feb 20th, 2013  |  syntax: None  |  size: 9.59 KB  |  hits: 70  |  expires: Never
download  |  raw  |  embed  |  report abuse  |  print
Text below is selected. Please press Ctrl+C to copy to your clipboard. (⌘+C on Mac)
  1. package com.cerner.kepler.filters;
  2.  
  3. import java.io.DataInput;
  4. import java.io.DataOutput;
  5. import java.io.IOException;
  6. import java.io.Serializable;
  7. import java.util.Arrays;
  8. import java.util.Comparator;
  9. import java.util.Iterator;
  10. import java.util.Set;
  11. import java.util.TreeSet;
  12.  
  13. import org.apache.hadoop.hbase.KeyValue;
  14. import org.apache.hadoop.hbase.filter.FilterBase;
  15. import org.apache.hadoop.hbase.util.Bytes;
  16. import org.apache.hadoop.hbase.util.Pair;
  17. import org.slf4j.Logger;
  18. import org.slf4j.LoggerFactory;
  19.  
  20. public class RowRangeFilter extends FilterBase {
  21.  
  22.     /**
  23.      * Logger
  24.      */
  25.     private static final Logger LOG = LoggerFactory.getLogger(RowRangeFilter.class);
  26.  
  27.     /**
  28.      * Boolean to indicate if we have loaded our scan into the first range
  29.      */
  30.     private boolean loaded = false;
  31.  
  32.     /**
  33.      * Boolean to indicate if we should abort the scan (true=abort)
  34.      */
  35.     private boolean filterAllRemaining = false;
  36.  
  37.     /**
  38.      * An object which determines what our filter does
  39.      */
  40.     private ReturnCode returnCode;
  41.  
  42.     /**
  43.      * The current byte[] pair that represents our row key range we are in
  44.      */
  45.     private Pair<byte[], byte[]> currentRange;
  46.  
  47.     /**
  48.      * A sorted iterator contained our row key ranges
  49.      */
  50.     private Iterator<Pair<byte[], byte[]>> rowKeyIterator;
  51.  
  52.     private int iteratorCount;
  53.    
  54.     /**
  55.      * Constructor used by HBase. DO NOT USE!
  56.      */
  57.     public RowRangeFilter() {
  58.         super();
  59.         LOG.debug("Creating new filter server side");
  60.     }
  61.  
  62.     /**
  63.      * Constructs the filter from a set of row key ranges represented by {@link Pair}s of byte[]'s.
  64.      *
  65.      * @param ranges
  66.      *            the set of row key ranges we want to filter on
  67.      * @throws IllegalArgumentException
  68.      *             if ranges is null or empty, or if any range's starting value is < then its end, or if any range overlaps with
  69.      *             another range
  70.      */
  71.     public RowRangeFilter(Set<Pair<byte[], byte[]>> ranges) {
  72.  
  73.         if (ranges == null)
  74.             throw new IllegalArgumentException("ranges cannot be null");
  75.         if (ranges.isEmpty())
  76.             throw new IllegalArgumentException("ranges must contain at least 1 pair");
  77.  
  78.         // By moving our data to a TreeSet we sort our data into the correct order before handing it off to hbase-server
  79.         TreeSet<Pair<byte[], byte[]>> treeSet = new TreeSet<Pair<byte[], byte[]>>(
  80.                 new FirstValuePairByteComparator());
  81.  
  82.         // TODO: Do we combine or throw exception or even ignore?
  83.  
  84.         Iterator<Pair<byte[], byte[]>> iterator = ranges.iterator();
  85.         while (iterator.hasNext()) {
  86.             Pair<byte[], byte[]> range = iterator.next();
  87.  
  88.             // Verify that each range is valid (i.e. start <= end)
  89.             if (Bytes.compareTo(range.getFirst(), range.getSecond()) > 0)
  90.                 throw new IllegalArgumentException(
  91.                         "The following range is invalid since its starting value is less then its end : "
  92.                                 + bytePairToString(range));
  93.  
  94.             // Verify that this new range does not overlap any existing ranges
  95.             for (Pair<byte[], byte[]> addedRange : treeSet) {
  96.                 // Verify that range's start point is not contained in addedRange
  97.                 if (Bytes.compareTo(range.getFirst(), addedRange.getFirst()) >= 0
  98.                         && Bytes.compareTo(range.getFirst(), addedRange.getSecond()) < 0)
  99.                     throw new IllegalArgumentException("The range : " + bytePairToString(range)
  100.                             + " overlaps an existing range : " + treeSet.remove(range));
  101.  
  102.                 // Verify that range's end point is not contained in addedRange
  103.                 if (Bytes.compareTo(range.getSecond(), addedRange.getFirst()) > 0
  104.                         && Bytes.compareTo(range.getSecond(), addedRange.getSecond()) <= 0)
  105.                     throw new IllegalArgumentException("The range : " + bytePairToString(range)
  106.                             + " overlaps an existing range : " + bytePairToString(addedRange));
  107.             }
  108.  
  109.             treeSet.add(range);
  110.         }
  111.         rowKeyIterator = treeSet.iterator();
  112.         iteratorCount = treeSet.size();
  113.     }
  114.  
  115.     @Override
  116.     public ReturnCode filterKeyValue(KeyValue ignored) {
  117.         return returnCode;
  118.     }
  119.  
  120.     @Override
  121.     public boolean filterAllRemaining() {
  122.         return filterAllRemaining;
  123.     }
  124.  
  125.     @Override
  126.     public KeyValue getNextKeyHint(KeyValue kv) {
  127.         return KeyValue.createFirstOnRow(currentRange.getFirst());
  128.     }
  129.  
  130.     @Override
  131.     public boolean filterRowKey(byte[] buffer, int offset, int length) {
  132.  
  133.         byte[] rowKey = Arrays.copyOfRange(buffer, offset, offset + length);
  134.  
  135.         // If this is the first time filtering seek to the appropriate range
  136.         if (!loaded) {
  137.             LOG.debug("Loading currentRange");
  138.             while (rowKeyIterator.hasNext()) {
  139.                 currentRange = rowKeyIterator.next();
  140.                 if (Bytes.compareTo(rowKey, currentRange.getSecond()) < 0) {
  141.                     break;
  142.                 }
  143.             }
  144.  
  145.             loaded = true;
  146.  
  147.             if (Bytes.compareTo(rowKey, currentRange.getFirst()) < 0) {
  148.                 if (LOG.isDebugEnabled())
  149.                     LOG.debug(
  150.                             "Current rowKey: {} is not in first range : {}, seeking to start of range",
  151.                             Bytes.toStringBinary(rowKey), bytePairToString(currentRange));
  152.                 returnCode = ReturnCode.SEEK_NEXT_USING_HINT;
  153.                 return true;
  154.             }
  155.         }
  156.  
  157.         // Verify that the rowKey is still within our range
  158.         if (Bytes.compareTo(rowKey, currentRange.getSecond()) < 0) {
  159.             if (LOG.isDebugEnabled())
  160.                 LOG.debug("Current rowKey : {} is contained within range : {}",
  161.                         Bytes.toStringBinary(rowKey), bytePairToString(currentRange));
  162.             returnCode = ReturnCode.INCLUDE;
  163.             return false;
  164.         }
  165.  
  166.         if (rowKeyIterator.hasNext()) {
  167.             if (LOG.isDebugEnabled())
  168.                 LOG.debug(
  169.                         "Current rowKey : {} is not contained by range : {}, seeking to next range",
  170.                         Bytes.toStringBinary(rowKey), bytePairToString(currentRange));
  171.             // Seek to the next range
  172.             currentRange = rowKeyIterator.next();
  173.             returnCode = ReturnCode.SEEK_NEXT_USING_HINT;
  174.             return true;
  175.         }
  176.  
  177.         if (LOG.isDebugEnabled())
  178.             LOG.debug("Current rowKey : {} is not contained within any ranges, aborting scan",
  179.                     Bytes.toStringBinary(rowKey));
  180.  
  181.         // We don't have another range to seek to so abort the scan
  182.         filterAllRemaining = true;
  183.         // Not sure if I have to set this here
  184.         returnCode = ReturnCode.NEXT_ROW;
  185.         return true;
  186.     }
  187.  
  188.     public void write(DataOutput out) throws IOException {
  189.         LOG.debug("Writing out filter data");
  190.         out.writeInt(iteratorCount);
  191.         LOG.debug("iteratorCount : {}", iteratorCount);
  192.         while (rowKeyIterator.hasNext()) {
  193.             LOG.debug("Writing range");
  194.             Pair<byte[], byte[]> range = rowKeyIterator.next();
  195.             Bytes.writeByteArray(out, range.getFirst());
  196.             Bytes.writeByteArray(out, range.getSecond());
  197.         }
  198.     }
  199.  
  200.     public void readFields(DataInput in) throws IOException {
  201.         TreeSet<Pair<byte[], byte[]>> ranges = new TreeSet<Pair<byte[], byte[]>>(
  202.                 new FirstValuePairByteComparator());
  203.  
  204.         LOG.debug("Reading fields");
  205.        
  206.         // Load ranges into treeset so we can create a sorted iterator
  207.         int totalRanges = in.readInt();
  208.        
  209.         LOG.debug("Read ranges {}", totalRanges);
  210.        
  211.         for(int i=0; i<totalRanges; i++){
  212.             LOG.debug("Reading range #{}", i+1);
  213.             byte[] first = Bytes.readByteArray(in);
  214.             LOG.debug("Read first range with length {}", first.length);
  215.             byte[] second = Bytes.readByteArray(in);
  216.             LOG.debug("Read second range with length {}", second.length);
  217.  
  218.             Pair<byte[], byte[]> range = Pair.newPair(first, second);
  219.            
  220.             LOG.debug("Read in range {}", bytePairToString(range));
  221.            
  222.             ranges.add(range);
  223.         }
  224.    
  225.         rowKeyIterator = ranges.iterator();
  226.        
  227.         if (LOG.isDebugEnabled())
  228.             LOG.debug("Finished reading fields. Loaded : {} ranges", ranges.size());
  229.     }
  230.  
  231.     private static class FirstValuePairByteComparator implements Comparator<Pair<byte[], byte[]>>, Serializable {
  232.         /**
  233.          * Generated serial version
  234.          */
  235.         private static final long serialVersionUID = 8426766119633439881L;
  236.  
  237.         public int compare(Pair<byte[], byte[]> pair1, Pair<byte[], byte[]> pair2) {
  238.             return Bytes.compareTo(pair1.getFirst(), pair2.getFirst());
  239.         }
  240.     }
  241.  
  242.     /**
  243.      * Simple function used in logging/exceptions to write out a range as a string
  244.      *
  245.      * @param pair
  246.      *            the row key range
  247.      * @return the string representing the row key range
  248.      */
  249.     private static String bytePairToString(Pair<byte[], byte[]> pair) {
  250.         StringBuilder builder = new StringBuilder();
  251.         builder.append("[");
  252.         builder.append(Bytes.toStringBinary(pair.getFirst()));
  253.         builder.append(", ");
  254.         builder.append(Bytes.toStringBinary(pair.getSecond()));
  255.         builder.append("]");
  256.         return builder.toString();
  257.     }
  258. }