1. package com.cerner.kepler.filters;
  2.  
  3. import java.io.DataInput;
  4. import java.io.DataOutput;
  5. import java.io.IOException;
  6. import java.io.Serializable;
  7. import java.util.Arrays;
  8. import java.util.Comparator;
  9. import java.util.Iterator;
  10. import java.util.Set;
  11. import java.util.TreeSet;
  12.  
  13. import org.apache.hadoop.hbase.KeyValue;
  14. import org.apache.hadoop.hbase.filter.FilterBase;
  15. import org.apache.hadoop.hbase.util.Bytes;
  16. import org.apache.hadoop.hbase.util.Pair;
  17. import org.slf4j.Logger;
  18. import org.slf4j.LoggerFactory;
  19.  
  20. public class RowRangeFilter extends FilterBase {
  21. // Go go Row Ranger, the might filterin' row ranger!
  22.  
  23. /**
  24. * Logger
  25. */
  26. private static final Logger LOG = LoggerFactory.getLogger(RowRangeFilter.class);
  27.  
  28. /**
  29. * Boolean to indicate if we have loaded our scan into the first range
  30. */
  31. private boolean loaded = false;
  32.  
  33. /**
  34. * Boolean to indicate if we should abort the scan (true=abort)
  35. */
  36. private boolean filterAllRemaining = false;
  37.  
  38. /**
  39. * An object which determines what our filter does
  40. */
  41. private ReturnCode returnCode;
  42.  
  43. /**
  44. * The current byte[] pair that represents our row key range we are in
  45. */
  46. private Pair<byte[], byte[]> currentRange;
  47.  
  48. /**
  49. * A sorted iterator contained our row key ranges
  50. */
  51. private Iterator<Pair<byte[], byte[]>> rowKeyIterator;
  52.  
  53. private int iteratorCount;
  54.  
  55. /**
  56. * Constructor used by HBase. DO NOT USE!
  57. */
  58. public RowRangeFilter() {
  59. super();
  60. LOG.debug("Creating new filter server side");
  61. }
  62.  
  63. /**
  64. * Constructs the filter from a set of row key ranges represented by {@link Pair}s of byte[]'s.
  65. *
  66. * @param ranges
  67. * the set of row key ranges we want to filter on
  68. * @throws IllegalArgumentException
  69. * if ranges is null or empty, or if any range's starting value is < then its end, or if any range overlaps with
  70. * another range
  71. */
  72. public RowRangeFilter(Set<Pair<byte[], byte[]>> ranges) {
  73.  
  74. if (ranges == null)
  75. throw new IllegalArgumentException("ranges cannot be null");
  76. if (ranges.isEmpty())
  77. throw new IllegalArgumentException("ranges must contain at least 1 pair");
  78.  
  79. // By moving our data to a TreeSet we sort our data into the correct order before handing it off to hbase-server
  80. TreeSet<Pair<byte[], byte[]>> treeSet = new TreeSet<Pair<byte[], byte[]>>(
  81. new FirstValuePairByteComparator());
  82.  
  83. // TODO: Do we combine or throw exception or even ignore?
  84.  
  85. Iterator<Pair<byte[], byte[]>> iterator = ranges.iterator();
  86. while (iterator.hasNext()) {
  87. Pair<byte[], byte[]> range = iterator.next();
  88.  
  89. // Verify that each range is valid (i.e. start <= end)
  90. if (Bytes.compareTo(range.getFirst(), range.getSecond()) > 0)
  91. throw new IllegalArgumentException(
  92. "The following range is invalid since its starting value is less then its end : "
  93. + bytePairToString(range));
  94.  
  95. // Verify that this new range does not overlap any existing ranges
  96. for (Pair<byte[], byte[]> addedRange : treeSet) {
  97. // Verify that range's start point is not contained in addedRange
  98. if (Bytes.compareTo(range.getFirst(), addedRange.getFirst()) >= 0
  99. && Bytes.compareTo(range.getFirst(), addedRange.getSecond()) < 0)
  100. throw new IllegalArgumentException("The range : " + bytePairToString(range)
  101. + " overlaps an existing range : " + treeSet.remove(range));
  102.  
  103. // Verify that range's end point is not contained in addedRange
  104. if (Bytes.compareTo(range.getSecond(), addedRange.getFirst()) > 0
  105. && Bytes.compareTo(range.getSecond(), addedRange.getSecond()) <= 0)
  106. throw new IllegalArgumentException("The range : " + bytePairToString(range)
  107. + " overlaps an existing range : " + bytePairToString(addedRange));
  108. }
  109.  
  110. treeSet.add(range);
  111. }
  112. rowKeyIterator = treeSet.iterator();
  113. iteratorCount = treeSet.size();
  114. }
  115.  
  116. @Override
  117. public ReturnCode filterKeyValue(KeyValue ignored) {
  118. return returnCode;
  119. }
  120.  
  121. @Override
  122. public boolean filterAllRemaining() {
  123. return filterAllRemaining;
  124. }
  125.  
  126. @Override
  127. public KeyValue getNextKeyHint(KeyValue kv) {
  128. return KeyValue.createFirstOnRow(currentRange.getFirst());
  129. }
  130.  
  131. @Override
  132. public boolean filterRowKey(byte[] buffer, int offset, int length) {
  133.  
  134. byte[] rowKey = Arrays.copyOfRange(buffer, offset, offset + length);
  135.  
  136. // If this is the first time filtering seek to the appropriate range
  137. if (!loaded) {
  138. LOG.debug("Loading currentRange");
  139. while (rowKeyIterator.hasNext()) {
  140. currentRange = rowKeyIterator.next();
  141. if (Bytes.compareTo(rowKey, currentRange.getSecond()) < 0) {
  142. break;
  143. }
  144. }
  145.  
  146. loaded = true;
  147.  
  148. if (Bytes.compareTo(rowKey, currentRange.getFirst()) < 0) {
  149. if (LOG.isDebugEnabled())
  150. LOG.debug(
  151. "Current rowKey: {} is not in first range : {}, seeking to start of range",
  152. Bytes.toStringBinary(rowKey), bytePairToString(currentRange));
  153. returnCode = ReturnCode.SEEK_NEXT_USING_HINT;
  154. return true;
  155. }
  156. }
  157.  
  158. // Verify that the rowKey is still within our range
  159. if (Bytes.compareTo(rowKey, currentRange.getSecond()) < 0) {
  160. if (LOG.isDebugEnabled())
  161. LOG.debug("Current rowKey : {} is contained within range : {}",
  162. Bytes.toStringBinary(rowKey), bytePairToString(currentRange));
  163. returnCode = ReturnCode.INCLUDE;
  164. return false;
  165. }
  166.  
  167. if (rowKeyIterator.hasNext()) {
  168. if (LOG.isDebugEnabled())
  169. LOG.debug(
  170. "Current rowKey : {} is not contained by range : {}, seeking to next range",
  171. Bytes.toStringBinary(rowKey), bytePairToString(currentRange));
  172. // Seek to the next range
  173. currentRange = rowKeyIterator.next();
  174. returnCode = ReturnCode.SEEK_NEXT_USING_HINT;
  175. return true;
  176. }
  177.  
  178. if (LOG.isDebugEnabled())
  179. LOG.debug("Current rowKey : {} is not contained within any ranges, aborting scan",
  180. Bytes.toStringBinary(rowKey));
  181.  
  182. // We don't have another range to seek to so abort the scan
  183. filterAllRemaining = true;
  184. // Not sure if I have to set this here
  185. returnCode = ReturnCode.NEXT_ROW;
  186. return true;
  187. }
  188.  
  189. public void write(DataOutput out) throws IOException {
  190. out.writeInt(iteratorCount);
  191. while (rowKeyIterator.hasNext()) {
  192. Pair<byte[], byte[]> range = rowKeyIterator.next();
  193.  
  194. byte[] first = range.getFirst();
  195. out.writeInt(first.length);
  196. out.write(first);
  197.  
  198. byte[] second = range.getSecond();
  199. out.writeInt(second.length);
  200. out.write(second);
  201. }
  202. }
  203.  
  204. public void readFields(DataInput in) throws IOException {
  205. TreeSet<Pair<byte[], byte[]>> ranges = new TreeSet<Pair<byte[], byte[]>>(
  206. new FirstValuePairByteComparator());
  207.  
  208. LOG.debug("Reading fields");
  209.  
  210. // Load ranges into treeset so we can create a sorted iterator
  211. int totalRanges = in.readInt();
  212.  
  213. for(int i=0; i<totalRanges; i++){
  214. byte[] first = new byte[in.readInt()];
  215. in.readFully(first);
  216. byte[] second = new byte[in.readInt()];
  217. in.readFully(second);
  218.  
  219. ranges.add(Pair.newPair(first, second));
  220. }
  221.  
  222. rowKeyIterator = ranges.iterator();
  223.  
  224. if (LOG.isDebugEnabled())
  225. LOG.debug("Finished reading fields. Loaded : {} ranges", ranges.size());
  226. }
  227.  
  228. private static class FirstValuePairByteComparator implements Comparator<Pair<byte[], byte[]>>, Serializable {
  229. /**
  230. * Generated serial version
  231. */
  232. private static final long serialVersionUID = 8426766119633439881L;
  233.  
  234. public int compare(Pair<byte[], byte[]> pair1, Pair<byte[], byte[]> pair2) {
  235. return Bytes.compareTo(pair1.getFirst(), pair2.getFirst());
  236. }
  237. }
  238.  
  239. /**
  240. * Simple function used in logging/exceptions to write out a range as a string
  241. *
  242. * @param pair
  243. * the row key range
  244. * @return the string representing the row key range
  245. */
  246. private static String bytePairToString(Pair<byte[], byte[]> pair) {
  247. StringBuilder builder = new StringBuilder();
  248. builder.append("[");
  249. builder.append(Bytes.toStringBinary(pair.getFirst()));
  250. builder.append(", ");
  251. builder.append(Bytes.toStringBinary(pair.getSecond()));
  252. builder.append("]");
  253. return builder.toString();
  254. }
  255. }