Advertisement
Ladies_Man

#HADOOP Lab6 (HBase) Filter

Jan 25th, 2016
120
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 5.09 KB | None | 0 0
  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.hbase.HBaseConfiguration;
  3. import org.apache.hadoop.hbase.TableName;
  4. import org.apache.hadoop.hbase.client.*;
  5. import org.apache.hadoop.hbase.filter.Filter;
  6.  
  7. import java.io.IOException;
  8.  
  9. public class FilterTable {
  10.     //col family
  11.     private static final byte[] CF = "data".getBytes();
  12.     //cols
  13.     private static final byte[] ATTR18_ARR_DELAY_NEW = "arr_delay_new".getBytes();
  14.     private static final byte[] ATTR19_CANCELLED = "cancelled".getBytes();
  15.  
  16.     public static void main(String[] args) throws IOException {
  17.  
  18.         Configuration config = HBaseConfiguration.create();
  19.         config.set("hbase.zookeper.quorum", "localhost");
  20.  
  21.         Connection connection = ConnectionFactory.createConnection(config);
  22.         Table table = connection.getTable(TableName.valueOf("flights"));
  23.  
  24.         String custom_delay = "10.0";
  25.         byte[] start_row = "2015-01-10".getBytes();
  26.         byte[] stop_row = "2015-01-20".getBytes();
  27.  
  28.         Filter custom_filter = new CustomFlightFilter(Float.parseFloat(custom_delay));
  29.         //Filter custom_filter = new CustomFlightFilter();
  30.  
  31.  
  32.         Scan scan = new Scan();
  33.         scan.addFamily(CF);
  34.         scan.setFilter(custom_filter);
  35.         scan.setStartRow(start_row);
  36.         scan.setStopRow(stop_row);
  37.  
  38.         ResultScanner scanner = table.getScanner(scan);
  39.  
  40.         //make sure u have restarted HBase after any changes in CustomFilter
  41.  
  42.         for (Result r : scanner) {
  43.             String s1 = new String(r.getRow(), "UTF-8");
  44.             String s2 = new String(r.getValue(CF, ATTR19_CANCELLED), "UTF-8");
  45.             String s3 = new String(r.getValue(CF, ATTR18_ARR_DELAY_NEW), "UTF-8");
  46.             System.out.println("Key:" + s1 + " Cancelled:" + s2 + ", Delay:" + s3);
  47.         }
  48.         System.out.println("Done");
  49.  
  50.         scanner.close();
  51.         table.close();
  52.         connection.close();
  53.  
  54.         //hbase> create "flights", "data"
  55.         //mvn package
  56.         //mvn compile exec:java -Dexec.mainClass="FillTable"
  57.         //../hbase-1.1.2/bin/stop-hbase.sh
  58.         //../hbase-1.1.2/bin/start-hbase.sh
  59.         //mvn compile exec:java -Dexec.mainClass="FilterTable"
  60.     }
  61. }
  62.  
  63.  
  64.  
  65.  
  66.  
  67.  
  68. FILTER:
  69. import org.apache.hadoop.hbase.Cell;
  70. import org.apache.hadoop.hbase.exceptions.DeserializationException;
  71. import org.apache.hadoop.hbase.filter.Filter;
  72. import org.apache.hadoop.hbase.filter.FilterBase;
  73. import org.apache.hadoop.hbase.filter.PageFilter;
  74. import org.apache.hadoop.hbase.util.Bytes;
  75.  
  76. import java.io.IOException;
  77.  
  78. public class CustomFlightFilter extends FilterBase {
  79.     private float delay;
  80.     private boolean remove_it;
  81.  
  82.     public CustomFlightFilter() {
  83.         super();
  84.         this.remove_it = true;
  85.     }
  86.  
  87.     public CustomFlightFilter(float delay) {
  88.         this.delay = delay;
  89.         this.remove_it = true;
  90.     }
  91.  
  92.     @Override
  93.     public ReturnCode filterKeyValue(Cell cell) throws IOException {
  94.         String column = new String(
  95.                 cell.getQualifierArray(),
  96.                 cell.getQualifierOffset(),
  97.                 cell.getQualifierLength());
  98.  
  99.         String str_value = new String(
  100.                 cell.getValueArray(),
  101.                 cell.getValueOffset(),
  102.                 cell.getValueLength());
  103.  
  104.         //string № 4765 has cancelled = 0.0, and arr_delay_new is empty
  105.  
  106.         String key = new String(
  107.                 cell.getRowArray(),
  108.                 cell.getRowOffset(),
  109.                 cell.getRowLength());
  110.  
  111.         float value;
  112.  
  113.         if (delay > 0.f) {
  114.             if (column.equals("arr_delay_new")) {
  115.  
  116.                 /*if (str_value.isEmpty()) {
  117.                     value = (float)0;
  118.                 } else {
  119.                     value = Float.parseFloat(str_value);
  120.                 }*/
  121.  
  122.                 System.out.println("key:" + key + "col:" + column + " val:" + str_value);
  123.  
  124.                 if (!str_value.isEmpty() && !str_value.equals("") &&  Float.parseFloat(str_value) > 10.f) {
  125.                     remove_it = false;
  126.                 }
  127.                 System.out.println("remove_it="+remove_it);
  128.             }
  129.         } else {
  130.  
  131.             if (column.equals("cancelled")) {
  132.  
  133.                 value = Float.parseFloat(str_value);
  134.  
  135.                 if ((float)0 != value) {
  136.                     remove_it = false;
  137.  
  138.                 }
  139.                 System.out.println("key:" + key +"remove_it="+remove_it);
  140.  
  141.             }
  142.         }
  143.  
  144.         return ReturnCode.INCLUDE;
  145.         //insert this cell into processed row like <cell> + <cell> + <cell> ...
  146.         //all the cells should remain in processed row
  147.     }
  148.  
  149.     public static Filter parseFrom(byte[] pbBytes) throws DeserializationException{
  150.         return new CustomFlightFilter(Bytes.toFloat(pbBytes));
  151.     }
  152.  
  153.     @Override
  154.     public byte[] toByteArray() throws IOException {
  155.         return Bytes.toBytes(delay);
  156.     }
  157.  
  158.     @Override
  159.     public boolean filterRow() throws IOException {
  160.         return remove_it;
  161.     }
  162.  
  163.     @Override
  164.     public void reset() throws IOException {
  165.         remove_it = true;
  166.     }
  167. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement