import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Insert implements Runnable {
public static final String TEST_TABLE = "test";
public static final String TEST_FAM = "test";
public static final String COL_PREFIX = "col_";
private final Logger logger = LoggerFactory.getLogger(getClass());
private final HTablePool tablePool;
private final HBaseAdmin admin;
private final Configuration config;
private final boolean useCompression;
private final int numRows;
private final int numThreads;
private final int batchSize;
private final int numRegions;
public static void main(String[] args) throws Exception {
Insert insert = new Insert();
insert.exec();
}
public Insert() throws Exception {
useCompression = true;
numRows = 1000000;
batchSize = 200;
numThreads = 8;
numRegions = 10;
config = HBaseConfiguration.create();
tablePool = new HTablePool(config, Integer.MAX_VALUE);
admin = new HBaseAdmin(config);
}
public void exec() throws Exception {
BoundedExecutor exec = new BoundedExecutor(numThreads);
createTable();
for (int i = 0; i < numRows / batchSize; i++) {
exec.execute(this);
}
exec.shutdownAndWait();
}
public void createTable() throws Exception {
if (admin.tableExists(TEST_TABLE)) {
logger.info("Dropping table...");
admin.disableTable(TEST_TABLE);
admin.deleteTable(TEST_TABLE);
}
logger.info("Creating table...");
HTableDescriptor htd = new HTableDescriptor(TEST_TABLE);
HColumnDescriptor hcdEvents = new HColumnDescriptor(TEST_FAM);
if (useCompression) {
hcdEvents.setCompressionType(Algorithm.SNAPPY);
}
htd.addFamily(hcdEvents);
admin.createTable(htd, new byte[] {0x10}, new byte[] {(byte) 0xF0}, numRegions);
logger.info("Table created.");
}
@Override
public void run() {
List<Put> putList = new ArrayList<Put>();
for (int i = 0; i < batchSize; i++) {
putList.add(createPut());
}
long start = System.currentTimeMillis();
insertBatch(putList);
logger.info("PUT: {} ms", System.currentTimeMillis() - start);
}
private void insertBatch(List<Put> putList) {
HTableInterface table = tablePool.getTable(TEST_TABLE);
try {
table.put(putList);
} catch (IOException x) {
x.printStackTrace();
} finally {
try {
table.close();
} catch (IOException x) {
x.printStackTrace();
}
}
}
private Put createPut() {
Put put = new Put(UUID.randomUUID().toString().getBytes());
for (int i = 0; i < 100; i++) {
String prefix = COL_PREFIX + i + "_";
for (int j = 0; j < 20; j++) {
String col = prefix + j;
put.add(Bytes.toBytes(TEST_FAM), Bytes.toBytes(col), Bytes.toBytes("data"));
}
}
return put;
}
}