Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.UUID;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.HColumnDescriptor;
- import org.apache.hadoop.hbase.HTableDescriptor;
- import org.apache.hadoop.hbase.client.HBaseAdmin;
- import org.apache.hadoop.hbase.client.HTableInterface;
- import org.apache.hadoop.hbase.client.HTablePool;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public class Insert implements Runnable {
- public static final String TEST_TABLE = "test";
- public static final String TEST_FAM = "test";
- public static final String COL_PREFIX = "col_";
- private final Logger logger = LoggerFactory.getLogger(getClass());
- private final HTablePool tablePool;
- private final HBaseAdmin admin;
- private final Configuration config;
- private final boolean useCompression;
- private final int numRows;
- private final int numThreads;
- private final int batchSize;
- private final int numRegions;
- public static void main(String[] args) throws Exception {
- Insert insert = new Insert();
- insert.exec();
- }
- public Insert() throws Exception {
- useCompression = true;
- numRows = 1000000;
- batchSize = 200;
- numThreads = 8;
- numRegions = 10;
- config = HBaseConfiguration.create();
- tablePool = new HTablePool(config, Integer.MAX_VALUE);
- admin = new HBaseAdmin(config);
- }
- public void exec() throws Exception {
- BoundedExecutor exec = new BoundedExecutor(numThreads);
- createTable();
- for (int i = 0; i < numRows / batchSize; i++) {
- exec.execute(this);
- }
- exec.shutdownAndWait();
- }
- public void createTable() throws Exception {
- if (admin.tableExists(TEST_TABLE)) {
- logger.info("Dropping table...");
- admin.disableTable(TEST_TABLE);
- admin.deleteTable(TEST_TABLE);
- }
- logger.info("Creating table...");
- HTableDescriptor htd = new HTableDescriptor(TEST_TABLE);
- HColumnDescriptor hcdEvents = new HColumnDescriptor(TEST_FAM);
- if (useCompression) {
- hcdEvents.setCompressionType(Algorithm.SNAPPY);
- }
- htd.addFamily(hcdEvents);
- admin.createTable(htd, new byte[] {0x10}, new byte[] {(byte) 0xF0}, numRegions);
- logger.info("Table created.");
- }
- @Override
- public void run() {
- List<Put> putList = new ArrayList<Put>();
- for (int i = 0; i < batchSize; i++) {
- putList.add(createPut());
- }
- long start = System.currentTimeMillis();
- insertBatch(putList);
- logger.info("PUT: {} ms", System.currentTimeMillis() - start);
- }
- private void insertBatch(List<Put> putList) {
- HTableInterface table = tablePool.getTable(TEST_TABLE);
- try {
- table.put(putList);
- } catch (IOException x) {
- x.printStackTrace();
- } finally {
- try {
- table.close();
- } catch (IOException x) {
- x.printStackTrace();
- }
- }
- }
- private Put createPut() {
- Put put = new Put(UUID.randomUUID().toString().getBytes());
- for (int i = 0; i < 100; i++) {
- String prefix = COL_PREFIX + i + "_";
- for (int j = 0; j < 20; j++) {
- String col = prefix + j;
- put.add(Bytes.toBytes(TEST_FAM), Bytes.toBytes(col), Bytes.toBytes("data"));
- }
- }
- return put;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement