import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.commons.lang.RandomStringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TestTable implements Runnable { HTable table = null; Configuration conf = null; String rowKey = null; int iterations = 0; private static final String TEST_TABLE_NAME = "MyTestTable"; private static final String LARGE_ROW = "ThisIsTheRowYoureLookingFor"; private static int totalUpdates = 0; private static final Logger LOG = LoggerFactory.getLogger(TestTable.class); /** * Construct a new TestTable * @param c * @param rowKey * @param iterations * @throws IOException */ public TestTable(Configuration c, String rowKey, int iterations) throws IOException { conf = c; table = new HTable(conf, TEST_TABLE_NAME); this.rowKey = rowKey; this.iterations = iterations; } public static void main(String[] args) throws Exception { String row = LARGE_ROW; int iterations = 1000; int numThreads = 4; if (args.length == 1 || args.length > 2) { System.out.println("Usage: TestTable [ ]"); System.exit(-1); } LOG.info("Row : " + row); LOG.info("Threads : " + numThreads); LOG.info("IteratorsPerThread: " + iterations); Configuration conf = HBaseConfiguration.create(); HBaseAdmin hbase = new HBaseAdmin(conf); // Create test table if it doesn't exist if (!hbase.tableExists(TEST_TABLE_NAME.getBytes())) { HTableDescriptor desc = new HTableDescriptor(TEST_TABLE_NAME); HColumnDescriptor data = new HColumnDescriptor("data".getBytes()); HColumnDescriptor stats = new HColumnDescriptor("stats".getBytes()); desc.addFamily(data); desc.addFamily(stats); hbase.createTable(desc); // init the table with random rows row = initDatabase(conf); // This might not be necessary, but just in case hbase is doing some work from all the updates // we just sent, we'll give it some time. LOG.info("Starting test in 10s."); Thread.sleep(10000); } List threads = new ArrayList(); for (int i = 0; i < numThreads; i++) { Thread t = new Thread(new TestTable(conf, row, iterations)); t.start(); threads.add(t); } for (Thread t : threads) { t.join(); } } private static String initDatabase(Configuration conf) throws IOException { HTable table = new HTable(conf, TEST_TABLE_NAME); LOG.info("Initializing the database. Adding random rows"); // Add some random rows List puts = new ArrayList(); for (int i = 1; i < 100000; i++) { String key = RandomStringUtils.randomAlphanumeric(14); Put put = new Put(Bytes.toBytes(key)); put.add(Bytes.toBytes("data"), Bytes.toBytes(RandomStringUtils.randomAlphanumeric(18)), Bytes.toBytes(RandomStringUtils.randomAlphanumeric(50))); put.add(Bytes.toBytes("stats"), Bytes.toBytes("hello"), Bytes.toBytes(RandomStringUtils.randomAlphanumeric(8))); puts.add(put); if (puts.size() > 0 && puts.size() % 1000 == 0) { table.put(puts); LOG.info("Added {} rows.", i); } } if (puts.size() > 0) { table.put(puts); } // Now give this one row tons of columns //String key = RandomStringUtils.randomAlphanumeric(14); String key = LARGE_ROW; Put put = new Put(Bytes.toBytes(key)); LOG.info("Row with tons of columns {}", key); put.add(Bytes.toBytes("stats"), Bytes.toBytes("hello"), Bytes.toBytes(RandomStringUtils.randomAlphanumeric(8))); for (int i = 0; i < 60000; i++) { put.add(Bytes.toBytes("data"), Bytes.toBytes(RandomStringUtils.randomAlphanumeric(18)), Bytes.toBytes(RandomStringUtils.randomAlphanumeric(50))); } table.put(put); return key; } @Override public void run() { for (int i = 1; i <= iterations; i++) { try { Get row = new Get(Bytes.toBytes(rowKey)); Put put = new Put(Bytes.toBytes(rowKey)); //Put put = new Put(Bytes.toBytes(RandomStringUtils.randomAlphanumeric(10))); Result r = table.get(row); Map data = r.getFamilyMap(Bytes.toBytes("data")); for (Map.Entry entry : data.entrySet()) { put.add(Bytes.toBytes("data"), entry.getKey(), entry.getValue()); } Map stats = r.getFamilyMap(Bytes.toBytes("stats")); for (Map.Entry entry : stats.entrySet()) { put.add(Bytes.toBytes("stats"), entry.getKey(), entry.getValue()); } table.put(put); incrementCounter(); } catch (IOException e) { LOG.error("ERROR: ", e); } } } synchronized private void incrementCounter() { totalUpdates++; LOG.info("Sent {} updates", totalUpdates); } }