import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestTable implements Runnable {
HTable table = null;
Configuration conf = null;
String rowKey = null;
int iterations = 0;
private static final String TEST_TABLE_NAME = "MyTestTable";
private static final String LARGE_ROW = "ThisIsTheRowYoureLookingFor";
private static int totalUpdates = 0;
private static final Logger LOG = LoggerFactory.getLogger(TestTable.class);
/**
* Construct a new TestTable
* @param c
* @param rowKey
* @param iterations
* @throws IOException
*/
public TestTable(Configuration c, String rowKey, int iterations) throws IOException {
conf = c;
table = new HTable(conf, TEST_TABLE_NAME);
this.rowKey = rowKey;
this.iterations = iterations;
}
public static void main(String[] args) throws Exception {
String row = LARGE_ROW;
int iterations = 1000;
int numThreads = 4;
if (args.length == 1 || args.length > 2) {
System.out.println("Usage: TestTable [<IterationsPerThread> <NumThreads>]");
System.exit(-1);
}
LOG.info("Row : " + row);
LOG.info("Threads : " + numThreads);
LOG.info("IteratorsPerThread: " + iterations);
Configuration conf = HBaseConfiguration.create();
HBaseAdmin hbase = new HBaseAdmin(conf);
// Create test table if it doesn't exist
if (!hbase.tableExists(TEST_TABLE_NAME.getBytes())) {
HTableDescriptor desc = new HTableDescriptor(TEST_TABLE_NAME);
HColumnDescriptor data = new HColumnDescriptor("data".getBytes());
HColumnDescriptor stats = new HColumnDescriptor("stats".getBytes());
desc.addFamily(data);
desc.addFamily(stats);
hbase.createTable(desc);
// init the table with random rows
row = initDatabase(conf);
// This might not be necessary, but just in case hbase is doing some work from all the updates
// we just sent, we'll give it some time.
LOG.info("Starting test in 10s.");
Thread.sleep(10000);
}
List<Thread> threads = new ArrayList<Thread>();
for (int i = 0; i < numThreads; i++) {
Thread t = new Thread(new TestTable(conf, row, iterations));
t.start();
threads.add(t);
}
for (Thread t : threads) {
t.join();
}
}
private static String initDatabase(Configuration conf) throws IOException {
HTable table = new HTable(conf, TEST_TABLE_NAME);
LOG.info("Initializing the database. Adding random rows");
// Add some random rows
List<Put> puts = new ArrayList<Put>();
for (int i = 1; i < 100000; i++) {
String key = RandomStringUtils.randomAlphanumeric(14);
Put put = new Put(Bytes.toBytes(key));
put.add(Bytes.toBytes("data"), Bytes.toBytes(RandomStringUtils.randomAlphanumeric(18)),
Bytes.toBytes(RandomStringUtils.randomAlphanumeric(50)));
put.add(Bytes.toBytes("stats"), Bytes.toBytes("hello"),
Bytes.toBytes(RandomStringUtils.randomAlphanumeric(8)));
puts.add(put);
if (puts.size() > 0 && puts.size() % 1000 == 0) {
table.put(puts);
LOG.info("Added {} rows.", i);
}
}
if (puts.size() > 0) {
table.put(puts);
}
// Now give this one row tons of columns
//String key = RandomStringUtils.randomAlphanumeric(14);
String key = LARGE_ROW;
Put put = new Put(Bytes.toBytes(key));
LOG.info("Row with tons of columns {}", key);
put.add(Bytes.toBytes("stats"), Bytes.toBytes("hello"),
Bytes.toBytes(RandomStringUtils.randomAlphanumeric(8)));
for (int i = 0; i < 60000; i++) {
put.add(Bytes.toBytes("data"), Bytes.toBytes(RandomStringUtils.randomAlphanumeric(18)),
Bytes.toBytes(RandomStringUtils.randomAlphanumeric(50)));
}
table.put(put);
return key;
}
@Override
public void run() {
for (int i = 1; i <= iterations; i++) {
try {
Get row = new Get(Bytes.toBytes(rowKey));
Put put = new Put(Bytes.toBytes(rowKey));
//Put put = new Put(Bytes.toBytes(RandomStringUtils.randomAlphanumeric(10)));
Result r = table.get(row);
Map<byte[], byte[]> data = r.getFamilyMap(Bytes.toBytes("data"));
for (Map.Entry<byte[], byte[]> entry : data.entrySet()) {
put.add(Bytes.toBytes("data"), entry.getKey(), entry.getValue());
}
Map<byte[], byte[]> stats = r.getFamilyMap(Bytes.toBytes("stats"));
for (Map.Entry<byte[], byte[]> entry : stats.entrySet()) {
put.add(Bytes.toBytes("stats"), entry.getKey(), entry.getValue());
}
table.put(put);
incrementCounter();
} catch (IOException e) {
LOG.error("ERROR: ", e);
}
}
}
synchronized private void incrementCounter() {
totalUpdates++;
LOG.info("Sent {} updates", totalUpdates);
}
}