Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- import org.apache.commons.lang.RandomStringUtils;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.HColumnDescriptor;
- import org.apache.hadoop.hbase.HTableDescriptor;
- import org.apache.hadoop.hbase.client.Get;
- import org.apache.hadoop.hbase.client.HBaseAdmin;
- import org.apache.hadoop.hbase.client.HTable;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.client.Result;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public class TestTable implements Runnable {
- HTable table = null;
- Configuration conf = null;
- String rowKey = null;
- int iterations = 0;
- private static final String TEST_TABLE_NAME = "MyTestTable";
- private static final String LARGE_ROW = "ThisIsTheRowYoureLookingFor";
- private static int totalUpdates = 0;
- private static final Logger LOG = LoggerFactory.getLogger(TestTable.class);
- /**
- * Construct a new TestTable
- * @param c
- * @param rowKey
- * @param iterations
- * @throws IOException
- */
- public TestTable(Configuration c, String rowKey, int iterations) throws IOException {
- conf = c;
- table = new HTable(conf, TEST_TABLE_NAME);
- this.rowKey = rowKey;
- this.iterations = iterations;
- }
- public static void main(String[] args) throws Exception {
- String row = LARGE_ROW;
- int iterations = 1000;
- int numThreads = 4;
- if (args.length == 1 || args.length > 2) {
- System.out.println("Usage: TestTable [<IterationsPerThread> <NumThreads>]");
- System.exit(-1);
- }
- LOG.info("Row : " + row);
- LOG.info("Threads : " + numThreads);
- LOG.info("IteratorsPerThread: " + iterations);
- Configuration conf = HBaseConfiguration.create();
- HBaseAdmin hbase = new HBaseAdmin(conf);
- // Create test table if it doesn't exist
- if (!hbase.tableExists(TEST_TABLE_NAME.getBytes())) {
- HTableDescriptor desc = new HTableDescriptor(TEST_TABLE_NAME);
- HColumnDescriptor data = new HColumnDescriptor("data".getBytes());
- HColumnDescriptor stats = new HColumnDescriptor("stats".getBytes());
- desc.addFamily(data);
- desc.addFamily(stats);
- hbase.createTable(desc);
- // init the table with random rows
- row = initDatabase(conf);
- // This might not be necessary, but just in case hbase is doing some work from all the updates
- // we just sent, we'll give it some time.
- LOG.info("Starting test in 10s.");
- Thread.sleep(10000);
- }
- List<Thread> threads = new ArrayList<Thread>();
- for (int i = 0; i < numThreads; i++) {
- Thread t = new Thread(new TestTable(conf, row, iterations));
- t.start();
- threads.add(t);
- }
- for (Thread t : threads) {
- t.join();
- }
- }
- private static String initDatabase(Configuration conf) throws IOException {
- HTable table = new HTable(conf, TEST_TABLE_NAME);
- LOG.info("Initializing the database. Adding random rows");
- // Add some random rows
- List<Put> puts = new ArrayList<Put>();
- for (int i = 1; i < 100000; i++) {
- String key = RandomStringUtils.randomAlphanumeric(14);
- Put put = new Put(Bytes.toBytes(key));
- put.add(Bytes.toBytes("data"), Bytes.toBytes(RandomStringUtils.randomAlphanumeric(18)),
- Bytes.toBytes(RandomStringUtils.randomAlphanumeric(50)));
- put.add(Bytes.toBytes("stats"), Bytes.toBytes("hello"),
- Bytes.toBytes(RandomStringUtils.randomAlphanumeric(8)));
- puts.add(put);
- if (puts.size() > 0 && puts.size() % 1000 == 0) {
- table.put(puts);
- LOG.info("Added {} rows.", i);
- }
- }
- if (puts.size() > 0) {
- table.put(puts);
- }
- // Now give this one row tons of columns
- //String key = RandomStringUtils.randomAlphanumeric(14);
- String key = LARGE_ROW;
- Put put = new Put(Bytes.toBytes(key));
- LOG.info("Row with tons of columns {}", key);
- put.add(Bytes.toBytes("stats"), Bytes.toBytes("hello"),
- Bytes.toBytes(RandomStringUtils.randomAlphanumeric(8)));
- for (int i = 0; i < 60000; i++) {
- put.add(Bytes.toBytes("data"), Bytes.toBytes(RandomStringUtils.randomAlphanumeric(18)),
- Bytes.toBytes(RandomStringUtils.randomAlphanumeric(50)));
- }
- table.put(put);
- return key;
- }
- @Override
- public void run() {
- for (int i = 1; i <= iterations; i++) {
- try {
- Get row = new Get(Bytes.toBytes(rowKey));
- Put put = new Put(Bytes.toBytes(rowKey));
- //Put put = new Put(Bytes.toBytes(RandomStringUtils.randomAlphanumeric(10)));
- Result r = table.get(row);
- Map<byte[], byte[]> data = r.getFamilyMap(Bytes.toBytes("data"));
- for (Map.Entry<byte[], byte[]> entry : data.entrySet()) {
- put.add(Bytes.toBytes("data"), entry.getKey(), entry.getValue());
- }
- Map<byte[], byte[]> stats = r.getFamilyMap(Bytes.toBytes("stats"));
- for (Map.Entry<byte[], byte[]> entry : stats.entrySet()) {
- put.add(Bytes.toBytes("stats"), entry.getKey(), entry.getValue());
- }
- table.put(put);
- incrementCounter();
- } catch (IOException e) {
- LOG.error("ERROR: ", e);
- }
- }
- }
- synchronized private void incrementCounter() {
- totalUpdates++;
- LOG.info("Sent {} updates", totalUpdates);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement