Advertisement
Guest User

Untitled

a guest
Feb 20th, 2013
147
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 9.59 KB | None | 0 0
  1. package com.cerner.kepler.filters;
  2.  
  3. import java.io.DataInput;
  4. import java.io.DataOutput;
  5. import java.io.IOException;
  6. import java.io.Serializable;
  7. import java.util.Arrays;
  8. import java.util.Comparator;
  9. import java.util.Iterator;
  10. import java.util.Set;
  11. import java.util.TreeSet;
  12.  
  13. import org.apache.hadoop.hbase.KeyValue;
  14. import org.apache.hadoop.hbase.filter.FilterBase;
  15. import org.apache.hadoop.hbase.util.Bytes;
  16. import org.apache.hadoop.hbase.util.Pair;
  17. import org.slf4j.Logger;
  18. import org.slf4j.LoggerFactory;
  19.  
  20. public class RowRangeFilter extends FilterBase {
  21.  
  22. /**
  23. * Logger
  24. */
  25. private static final Logger LOG = LoggerFactory.getLogger(RowRangeFilter.class);
  26.  
  27. /**
  28. * Boolean to indicate if we have loaded our scan into the first range
  29. */
  30. private boolean loaded = false;
  31.  
  32. /**
  33. * Boolean to indicate if we should abort the scan (true=abort)
  34. */
  35. private boolean filterAllRemaining = false;
  36.  
  37. /**
  38. * An object which determines what our filter does
  39. */
  40. private ReturnCode returnCode;
  41.  
  42. /**
  43. * The current byte[] pair that represents our row key range we are in
  44. */
  45. private Pair<byte[], byte[]> currentRange;
  46.  
  47. /**
  48. * A sorted iterator contained our row key ranges
  49. */
  50. private Iterator<Pair<byte[], byte[]>> rowKeyIterator;
  51.  
  52. private int iteratorCount;
  53.  
  54. /**
  55. * Constructor used by HBase. DO NOT USE!
  56. */
  57. public RowRangeFilter() {
  58. super();
  59. LOG.debug("Creating new filter server side");
  60. }
  61.  
  62. /**
  63. * Constructs the filter from a set of row key ranges represented by {@link Pair}s of byte[]'s.
  64. *
  65. * @param ranges
  66. * the set of row key ranges we want to filter on
  67. * @throws IllegalArgumentException
  68. * if ranges is null or empty, or if any range's starting value is < then its end, or if any range overlaps with
  69. * another range
  70. */
  71. public RowRangeFilter(Set<Pair<byte[], byte[]>> ranges) {
  72.  
  73. if (ranges == null)
  74. throw new IllegalArgumentException("ranges cannot be null");
  75. if (ranges.isEmpty())
  76. throw new IllegalArgumentException("ranges must contain at least 1 pair");
  77.  
  78. // By moving our data to a TreeSet we sort our data into the correct order before handing it off to hbase-server
  79. TreeSet<Pair<byte[], byte[]>> treeSet = new TreeSet<Pair<byte[], byte[]>>(
  80. new FirstValuePairByteComparator());
  81.  
  82. // TODO: Do we combine or throw exception or even ignore?
  83.  
  84. Iterator<Pair<byte[], byte[]>> iterator = ranges.iterator();
  85. while (iterator.hasNext()) {
  86. Pair<byte[], byte[]> range = iterator.next();
  87.  
  88. // Verify that each range is valid (i.e. start <= end)
  89. if (Bytes.compareTo(range.getFirst(), range.getSecond()) > 0)
  90. throw new IllegalArgumentException(
  91. "The following range is invalid since its starting value is less then its end : "
  92. + bytePairToString(range));
  93.  
  94. // Verify that this new range does not overlap any existing ranges
  95. for (Pair<byte[], byte[]> addedRange : treeSet) {
  96. // Verify that range's start point is not contained in addedRange
  97. if (Bytes.compareTo(range.getFirst(), addedRange.getFirst()) >= 0
  98. && Bytes.compareTo(range.getFirst(), addedRange.getSecond()) < 0)
  99. throw new IllegalArgumentException("The range : " + bytePairToString(range)
  100. + " overlaps an existing range : " + treeSet.remove(range));
  101.  
  102. // Verify that range's end point is not contained in addedRange
  103. if (Bytes.compareTo(range.getSecond(), addedRange.getFirst()) > 0
  104. && Bytes.compareTo(range.getSecond(), addedRange.getSecond()) <= 0)
  105. throw new IllegalArgumentException("The range : " + bytePairToString(range)
  106. + " overlaps an existing range : " + bytePairToString(addedRange));
  107. }
  108.  
  109. treeSet.add(range);
  110. }
  111. rowKeyIterator = treeSet.iterator();
  112. iteratorCount = treeSet.size();
  113. }
  114.  
  115. @Override
  116. public ReturnCode filterKeyValue(KeyValue ignored) {
  117. return returnCode;
  118. }
  119.  
  120. @Override
  121. public boolean filterAllRemaining() {
  122. return filterAllRemaining;
  123. }
  124.  
  125. @Override
  126. public KeyValue getNextKeyHint(KeyValue kv) {
  127. return KeyValue.createFirstOnRow(currentRange.getFirst());
  128. }
  129.  
  130. @Override
  131. public boolean filterRowKey(byte[] buffer, int offset, int length) {
  132.  
  133. byte[] rowKey = Arrays.copyOfRange(buffer, offset, offset + length);
  134.  
  135. // If this is the first time filtering seek to the appropriate range
  136. if (!loaded) {
  137. LOG.debug("Loading currentRange");
  138. while (rowKeyIterator.hasNext()) {
  139. currentRange = rowKeyIterator.next();
  140. if (Bytes.compareTo(rowKey, currentRange.getSecond()) < 0) {
  141. break;
  142. }
  143. }
  144.  
  145. loaded = true;
  146.  
  147. if (Bytes.compareTo(rowKey, currentRange.getFirst()) < 0) {
  148. if (LOG.isDebugEnabled())
  149. LOG.debug(
  150. "Current rowKey: {} is not in first range : {}, seeking to start of range",
  151. Bytes.toStringBinary(rowKey), bytePairToString(currentRange));
  152. returnCode = ReturnCode.SEEK_NEXT_USING_HINT;
  153. return true;
  154. }
  155. }
  156.  
  157. // Verify that the rowKey is still within our range
  158. if (Bytes.compareTo(rowKey, currentRange.getSecond()) < 0) {
  159. if (LOG.isDebugEnabled())
  160. LOG.debug("Current rowKey : {} is contained within range : {}",
  161. Bytes.toStringBinary(rowKey), bytePairToString(currentRange));
  162. returnCode = ReturnCode.INCLUDE;
  163. return false;
  164. }
  165.  
  166. if (rowKeyIterator.hasNext()) {
  167. if (LOG.isDebugEnabled())
  168. LOG.debug(
  169. "Current rowKey : {} is not contained by range : {}, seeking to next range",
  170. Bytes.toStringBinary(rowKey), bytePairToString(currentRange));
  171. // Seek to the next range
  172. currentRange = rowKeyIterator.next();
  173. returnCode = ReturnCode.SEEK_NEXT_USING_HINT;
  174. return true;
  175. }
  176.  
  177. if (LOG.isDebugEnabled())
  178. LOG.debug("Current rowKey : {} is not contained within any ranges, aborting scan",
  179. Bytes.toStringBinary(rowKey));
  180.  
  181. // We don't have another range to seek to so abort the scan
  182. filterAllRemaining = true;
  183. // Not sure if I have to set this here
  184. returnCode = ReturnCode.NEXT_ROW;
  185. return true;
  186. }
  187.  
  188. public void write(DataOutput out) throws IOException {
  189. LOG.debug("Writing out filter data");
  190. out.writeInt(iteratorCount);
  191. LOG.debug("iteratorCount : {}", iteratorCount);
  192. while (rowKeyIterator.hasNext()) {
  193. LOG.debug("Writing range");
  194. Pair<byte[], byte[]> range = rowKeyIterator.next();
  195. Bytes.writeByteArray(out, range.getFirst());
  196. Bytes.writeByteArray(out, range.getSecond());
  197. }
  198. }
  199.  
  200. public void readFields(DataInput in) throws IOException {
  201. TreeSet<Pair<byte[], byte[]>> ranges = new TreeSet<Pair<byte[], byte[]>>(
  202. new FirstValuePairByteComparator());
  203.  
  204. LOG.debug("Reading fields");
  205.  
  206. // Load ranges into treeset so we can create a sorted iterator
  207. int totalRanges = in.readInt();
  208.  
  209. LOG.debug("Read ranges {}", totalRanges);
  210.  
  211. for(int i=0; i<totalRanges; i++){
  212. LOG.debug("Reading range #{}", i+1);
  213. byte[] first = Bytes.readByteArray(in);
  214. LOG.debug("Read first range with length {}", first.length);
  215. byte[] second = Bytes.readByteArray(in);
  216. LOG.debug("Read second range with length {}", second.length);
  217.  
  218. Pair<byte[], byte[]> range = Pair.newPair(first, second);
  219.  
  220. LOG.debug("Read in range {}", bytePairToString(range));
  221.  
  222. ranges.add(range);
  223. }
  224.  
  225. rowKeyIterator = ranges.iterator();
  226.  
  227. if (LOG.isDebugEnabled())
  228. LOG.debug("Finished reading fields. Loaded : {} ranges", ranges.size());
  229. }
  230.  
  231. private static class FirstValuePairByteComparator implements Comparator<Pair<byte[], byte[]>>, Serializable {
  232. /**
  233. * Generated serial version
  234. */
  235. private static final long serialVersionUID = 8426766119633439881L;
  236.  
  237. public int compare(Pair<byte[], byte[]> pair1, Pair<byte[], byte[]> pair2) {
  238. return Bytes.compareTo(pair1.getFirst(), pair2.getFirst());
  239. }
  240. }
  241.  
  242. /**
  243. * Simple function used in logging/exceptions to write out a range as a string
  244. *
  245. * @param pair
  246. * the row key range
  247. * @return the string representing the row key range
  248. */
  249. private static String bytePairToString(Pair<byte[], byte[]> pair) {
  250. StringBuilder builder = new StringBuilder();
  251. builder.append("[");
  252. builder.append(Bytes.toStringBinary(pair.getFirst()));
  253. builder.append(", ");
  254. builder.append(Bytes.toStringBinary(pair.getSecond()));
  255. builder.append("]");
  256. return builder.toString();
  257. }
  258. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement