import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Insert implements Runnable { public static final String TEST_TABLE = "test"; public static final String TEST_FAM = "test"; public static final String COL_PREFIX = "col_"; private final Logger logger = LoggerFactory.getLogger(getClass()); private final HTablePool tablePool; private final HBaseAdmin admin; private final Configuration config; private final boolean useCompression; private final int numRows; private final int numThreads; private final int batchSize; private final int numRegions; public static void main(String[] args) throws Exception { Insert insert = new Insert(); insert.exec(); } public Insert() throws Exception { useCompression = true; numRows = 1000000; batchSize = 200; numThreads = 8; numRegions = 10; config = HBaseConfiguration.create(); tablePool = new HTablePool(config, Integer.MAX_VALUE); admin = new HBaseAdmin(config); } public void exec() throws Exception { BoundedExecutor exec = new BoundedExecutor(numThreads); createTable(); for (int i = 0; i < numRows / batchSize; i++) { exec.execute(this); } exec.shutdownAndWait(); } public void createTable() throws Exception { if (admin.tableExists(TEST_TABLE)) { logger.info("Dropping table..."); admin.disableTable(TEST_TABLE); admin.deleteTable(TEST_TABLE); } logger.info("Creating table..."); HTableDescriptor htd = new HTableDescriptor(TEST_TABLE); HColumnDescriptor hcdEvents = new HColumnDescriptor(TEST_FAM); if (useCompression) { hcdEvents.setCompressionType(Algorithm.SNAPPY); } htd.addFamily(hcdEvents); admin.createTable(htd, new byte[] {0x10}, new byte[] {(byte) 0xF0}, numRegions); logger.info("Table created."); } @Override public void run() { List putList = new ArrayList(); for (int i = 0; i < batchSize; i++) { putList.add(createPut()); } long start = System.currentTimeMillis(); insertBatch(putList); logger.info("PUT: {} ms", System.currentTimeMillis() - start); } private void insertBatch(List putList) { HTableInterface table = tablePool.getTable(TEST_TABLE); try { table.put(putList); } catch (IOException x) { x.printStackTrace(); } finally { try { table.close(); } catch (IOException x) { x.printStackTrace(); } } } private Put createPut() { Put put = new Put(UUID.randomUUID().toString().getBytes()); for (int i = 0; i < 100; i++) { String prefix = COL_PREFIX + i + "_"; for (int j = 0; j < 20; j++) { String col = prefix + j; put.add(Bytes.toBytes(TEST_FAM), Bytes.toBytes(col), Bytes.toBytes("data")); } } return put; } }