package com.intuit.ihub.hbase.poc.aggregation;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;
import com.intuit.ihub.hbase.poc.aggregation.interpreter.DoubleColumnInterpreter;
public class TransactionAmountSum {
private static Configuration conf = HBaseConfiguration.create();
public static void doAggregation(String prefix, String endRow, String tableName)
{
System.out.println("Setting: hbase.coprocessor.region.classes ");
conf.set("hbase.coprocessor.region.classes", "org.apache.hadoop.hbase.coprocessor.AggregateImplementation");
AggregationClient aClient = new AggregationClient(conf);
Filter filter = new PrefixFilter(Bytes.toBytes(prefix));
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("t"), Bytes.toBytes("amt"));
scan.setStartRow(Bytes.toBytes(prefix));
scan.setStopRow(Bytes.toBytes(endRow));
ColumnInterpreter<Double,Double> ci = new DoubleColumnInterpreter(); // this is //my custom Custom Column Interpreter
try {
long rowCount = aClient.rowCount(Bytes.toBytes(tableName), ci, scan);
double sum = aClient.sum(Bytes.toBytes(tableName), ci, scan);
System.out.print("Sum is:" + sum);
System.out.print("rowcount is:" + sum);
} catch (Throwable e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}