Advertisement
Guest User

HBase insert

a guest
Jan 11th, 2013
96
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 3.83 KB | None | 0 0
  1. import java.io.IOException;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import java.util.UUID;
  5.  
  6. import org.apache.hadoop.conf.Configuration;
  7. import org.apache.hadoop.hbase.HBaseConfiguration;
  8. import org.apache.hadoop.hbase.HColumnDescriptor;
  9. import org.apache.hadoop.hbase.HTableDescriptor;
  10. import org.apache.hadoop.hbase.client.HBaseAdmin;
  11. import org.apache.hadoop.hbase.client.HTableInterface;
  12. import org.apache.hadoop.hbase.client.HTablePool;
  13. import org.apache.hadoop.hbase.client.Put;
  14. import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
  15. import org.apache.hadoop.hbase.util.Bytes;
  16. import org.slf4j.Logger;
  17. import org.slf4j.LoggerFactory;
  18.  
  19. public class Insert implements Runnable {
  20.  
  21.     public static final String TEST_TABLE = "test";
  22.     public static final String TEST_FAM = "test";
  23.     public static final String COL_PREFIX = "col_";
  24.  
  25.     private final Logger logger = LoggerFactory.getLogger(getClass());
  26.  
  27.     private final HTablePool tablePool;
  28.     private final HBaseAdmin admin;
  29.     private final Configuration config;
  30.  
  31.     private final boolean useCompression;
  32.     private final int numRows;
  33.     private final int numThreads;
  34.     private final int batchSize;
  35.     private final int numRegions;
  36.  
  37.     public static void main(String[] args) throws Exception {
  38.         Insert insert = new Insert();
  39.         insert.exec();
  40.     }
  41.  
  42.     public Insert() throws Exception {
  43.         useCompression = true;
  44.         numRows = 1000000;
  45.         batchSize = 200;
  46.         numThreads = 8;
  47.         numRegions = 10;
  48.  
  49.         config = HBaseConfiguration.create();
  50.  
  51.         tablePool = new HTablePool(config, Integer.MAX_VALUE);
  52.         admin = new HBaseAdmin(config);
  53.     }
  54.  
  55.     public void exec() throws Exception {
  56.         BoundedExecutor exec = new BoundedExecutor(numThreads);
  57.  
  58.         createTable();
  59.  
  60.         for (int i = 0; i < numRows / batchSize; i++) {
  61.             exec.execute(this);
  62.         }
  63.  
  64.         exec.shutdownAndWait();
  65.     }
  66.  
  67.     public void createTable() throws Exception {
  68.         if (admin.tableExists(TEST_TABLE)) {
  69.             logger.info("Dropping table...");
  70.             admin.disableTable(TEST_TABLE);
  71.             admin.deleteTable(TEST_TABLE);
  72.         }
  73.  
  74.         logger.info("Creating table...");
  75.         HTableDescriptor htd = new HTableDescriptor(TEST_TABLE);
  76.         HColumnDescriptor hcdEvents = new HColumnDescriptor(TEST_FAM);
  77.         if (useCompression) {
  78.             hcdEvents.setCompressionType(Algorithm.SNAPPY);
  79.         }
  80.  
  81.         htd.addFamily(hcdEvents);
  82.         admin.createTable(htd, new byte[] {0x10}, new byte[] {(byte) 0xF0}, numRegions);
  83.         logger.info("Table created.");
  84.     }
  85.  
  86.     @Override
  87.     public void run() {
  88.         List<Put> putList = new ArrayList<Put>();
  89.         for (int i = 0; i < batchSize; i++) {
  90.             putList.add(createPut());
  91.         }
  92.         long start = System.currentTimeMillis();
  93.         insertBatch(putList);
  94.         logger.info("PUT: {} ms", System.currentTimeMillis() - start);
  95.     }
  96.  
  97.     private void insertBatch(List<Put> putList) {
  98.         HTableInterface table = tablePool.getTable(TEST_TABLE);
  99.         try {
  100.             table.put(putList);
  101.         } catch (IOException x) {
  102.             x.printStackTrace();
  103.         } finally {
  104.             try {
  105.                 table.close();
  106.             } catch (IOException x) {
  107.                 x.printStackTrace();
  108.             }
  109.         }
  110.     }
  111.  
  112.     private Put createPut() {
  113.         Put put = new Put(UUID.randomUUID().toString().getBytes());
  114.         for (int i = 0; i < 100; i++) {
  115.             String prefix = COL_PREFIX + i + "_";
  116.             for (int j = 0; j < 20; j++) {
  117.                 String col = prefix + j;
  118.                 put.add(Bytes.toBytes(TEST_FAM), Bytes.toBytes(col), Bytes.toBytes("data"));
  119.             }
  120.         }
  121.         return put;
  122.     }
  123.  
  124. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement