Guest User

Untitled

a guest
Nov 28th, 2011
159
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 43.83 KB | None | 0 0
  1. package org.apache.hadoop.hbase;
  2. /**
  3.  * Copyright 2007 The Apache Software Foundation
  4.  *
  5.  * Licensed to the Apache Software Foundation (ASF) under one
  6.  * or more contributor license agreements.  See the NOTICE file
  7.  * distributed with this work for additional information
  8.  * regarding copyright ownership.  The ASF licenses this file
  9.  * to you under the Apache License, Version 2.0 (the
  10.  * "License"); you may not use this file except in compliance
  11.  * with the License.  You may obtain a copy of the License at
  12.  *
  13.  *     http://www.apache.org/licenses/LICENSE-2.0
  14.  *
  15.  * Unless required by applicable law or agreed to in writing, software
  16.  * distributed under the License is distributed on an "AS IS" BASIS,
  17.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  18.  * See the License for the specific language governing permissions and
  19.  * limitations under the License.
  20.  */
  21.  
  22.  
  23. import java.io.DataInput;
  24. import java.io.DataOutput;
  25. import java.io.File;
  26. import java.io.IOException;
  27. import java.io.PrintStream;
  28. import java.lang.reflect.Constructor;
  29. import java.text.SimpleDateFormat;
  30. import java.util.ArrayList;
  31. import java.util.Arrays;
  32. import java.util.Date;
  33. import java.util.List;
  34. import java.util.Map;
  35. import java.util.Random;
  36. import java.util.TreeMap;
  37. import java.util.regex.Matcher;
  38. import java.util.regex.Pattern;
  39.  
  40. import org.apache.commons.logging.Log;
  41. import org.apache.commons.logging.LogFactory;
  42. import org.apache.hadoop.conf.Configuration;
  43. import org.apache.hadoop.fs.FSDataInputStream;
  44. import org.apache.hadoop.fs.FileStatus;
  45. import org.apache.hadoop.fs.FileSystem;
  46. import org.apache.hadoop.fs.Path;
  47. import org.apache.hadoop.hbase.client.Get;
  48. import org.apache.hadoop.hbase.client.HBaseAdmin;
  49. import org.apache.hadoop.hbase.client.HTable;
  50. import org.apache.hadoop.hbase.client.Put;
  51. import org.apache.hadoop.hbase.client.Result;
  52. import org.apache.hadoop.hbase.client.ResultScanner;
  53. import org.apache.hadoop.hbase.client.Scan;
  54. import org.apache.hadoop.hbase.filter.BinaryComparator;
  55. import org.apache.hadoop.hbase.filter.CompareFilter;
  56. import org.apache.hadoop.hbase.filter.Filter;
  57. import org.apache.hadoop.hbase.filter.PageFilter;
  58. import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
  59. import org.apache.hadoop.hbase.filter.WhileMatchFilter;
  60. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
  61. import org.apache.hadoop.hbase.util.Bytes;
  62. import org.apache.hadoop.hbase.util.FSUtils;
  63. import org.apache.hadoop.hbase.util.Hash;
  64. import org.apache.hadoop.hbase.util.MurmurHash;
  65. import org.apache.hadoop.hbase.util.Pair;
  66. import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
  67. import org.apache.hadoop.hdfs.MiniDFSCluster;
  68. import org.apache.hadoop.io.LongWritable;
  69. import org.apache.hadoop.io.NullWritable;
  70. import org.apache.hadoop.io.Text;
  71. import org.apache.hadoop.io.Writable;
  72. import org.apache.hadoop.mapreduce.InputSplit;
  73. import org.apache.hadoop.mapreduce.Job;
  74. import org.apache.hadoop.mapreduce.JobContext;
  75. import org.apache.hadoop.mapreduce.Mapper;
  76. import org.apache.hadoop.mapreduce.RecordReader;
  77. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  78. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  79. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  80. import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
  81. import org.apache.hadoop.util.LineReader;
  82.  
  83.  
  84. /**
  85.  * Script used evaluating HBase performance and scalability.  Runs a HBase
  86.  * client that steps through one of a set of hardcoded tests or 'experiments'
  87.  * (e.g. a random reads test, a random writes test, etc.). Pass on the
  88.  * command-line which test to run and how many clients are participating in
  89.  * this experiment. Run <code>java PerformanceEvaluation --help</code> to
  90.  * obtain usage.
  91.  *
  92.  * <p>This class sets up and runs the evaluation programs described in
  93.  * Section 7, <i>Performance Evaluation</i>, of the <a
  94.  * href="http://labs.google.com/papers/bigtable.html">Bigtable</a>
  95.  * paper, pages 8-10.
  96.  *
  97.  * <p>If number of clients > 1, we start up a MapReduce job. Each map task
  98.  * runs an individual client. Each client does about 1GB of data.
  99.  */
  100. public class PerformanceEvaluation2 {
  101.   protected static final Log LOG = LogFactory.getLog(PerformanceEvaluation2.class.getName());
  102.  
  103.   private static final int ROW_LENGTH = 1000;
  104.   private static final int ONE_GB = 1024 * 1024 * 1000;
  105.   private static final int ROWS_PER_GB = ONE_GB / ROW_LENGTH;
  106.  
  107.   public static final byte[] TABLE_NAME = Bytes.toBytes("TestTable");
  108.   public static final byte[] FAMILY_NAME = Bytes.toBytes("info");
  109.   public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data");
  110.  
  111.   protected static final HTableDescriptor TABLE_DESCRIPTOR;
  112.   static {
  113.     TABLE_DESCRIPTOR = new HTableDescriptor(TABLE_NAME);
  114.     TABLE_DESCRIPTOR.addFamily(new HColumnDescriptor(FAMILY_NAME));
  115.   }
  116.  
  117.   protected Map<String, CmdDescriptor> commands = new TreeMap<String, CmdDescriptor>();
  118.  
  119.   volatile Configuration conf;
  120.   private boolean miniCluster = false;
  121.   private boolean nomapred = false;
  122.   private int N = 1;
  123.   private int R = ROWS_PER_GB;
  124.   private boolean flushCommits = true;
  125.   private boolean writeToWAL = true;
  126.   private int presplitRegions = 0;
  127.   private int startRow = 0;
  128.   private int endRow = R;
  129.  
  130.   private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
  131.   /**
  132.    * Regex to parse lines in input file passed to mapreduce task.
  133.    */
  134.   public static final Pattern LINE_PATTERN =
  135.     Pattern.compile("startRow=(\\d+),\\s+" +
  136.         "perClientRunRows=(\\d+),\\s+" +
  137.         "totalRows=(\\d+),\\s+" +
  138.         "clients=(\\d+),\\s+" +
  139.         "flushCommits=(\\w+),\\s+" +
  140.         "writeToWAL=(\\w+)");
  141.  
  142.   /**
  143.    * Enum for map metrics.  Keep it out here rather than inside in the Map
  144.    * inner-class so we can find associated properties.
  145.    */
  146.   protected static enum Counter {
  147.     /** elapsed time */
  148.     ELAPSED_TIME,
  149.     /** number of rows */
  150.     ROWS}
  151.  
  152.  
  153.   /**
  154.    * Constructor
  155.    * @param c Configuration object
  156.    */
  157.   public PerformanceEvaluation2(final Configuration c) {
  158.     this.conf = c;
  159.  
  160.     addCommandDescriptor(RandomReadTest.class, "randomRead",
  161.         "Run random read test");
  162.     addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan",
  163.         "Run random seek and scan 100 test");
  164.     addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
  165.         "Run random seek scan with both start and stop row (max 10 rows)");
  166.     addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
  167.         "Run random seek scan with both start and stop row (max 100 rows)");
  168.     addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
  169.         "Run random seek scan with both start and stop row (max 1000 rows)");
  170.     addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
  171.         "Run random seek scan with both start and stop row (max 10000 rows)");
  172.     addCommandDescriptor(RandomWriteTest.class, "randomWrite",
  173.         "Run random write test");
  174.     addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
  175.         "Run sequential read test");
  176.     addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
  177.         "Run sequential write test");
  178.     addCommandDescriptor(ScanTest.class, "scan",
  179.         "Run scan test (read every row)");
  180.     addCommandDescriptor(FilteredScanTest.class, "filterScan",
  181.         "Run scan test using a filter to find a specific row based on it's value (make sure to use --rows=20)");
  182.   }
  183.  
  184.   protected void addCommandDescriptor(Class<? extends Test> cmdClass,
  185.       String name, String description) {
  186.     CmdDescriptor cmdDescriptor =
  187.       new CmdDescriptor(cmdClass, name, description);
  188.     commands.put(name, cmdDescriptor);
  189.   }
  190.  
  191.   /**
  192.    * Implementations can have their status set.
  193.    */
  194.   static interface Status {
  195.     /**
  196.      * Sets status
  197.      * @param msg status message
  198.      * @throws IOException
  199.      */
  200.     void setStatus(final String msg) throws IOException;
  201.   }
  202.  
  203.   /**
  204.    *  This class works as the InputSplit of Performance Evaluation
  205.    *  MapReduce InputFormat, and the Record Value of RecordReader.
  206.    *  Each map task will only read one record from a PeInputSplit,
  207.    *  the record value is the PeInputSplit itself.
  208.    */
  209.   public static class PeInputSplit extends InputSplit implements Writable {
  210.     private int startRow = 0;
  211.     private int rows = 0;
  212.     private int totalRows = 0;
  213.     private int clients = 0;
  214.     private boolean flushCommits = false;
  215.     private boolean writeToWAL = true;
  216.  
  217.     public PeInputSplit() {
  218.       this.startRow = 0;
  219.       this.rows = 0;
  220.       this.totalRows = 0;
  221.       this.clients = 0;
  222.       this.flushCommits = false;
  223.       this.writeToWAL = true;
  224.     }
  225.  
  226.     public PeInputSplit(int startRow, int rows, int totalRows, int clients,
  227.         boolean flushCommits, boolean writeToWAL) {
  228.       this.startRow = startRow;
  229.       this.rows = rows;
  230.       this.totalRows = totalRows;
  231.       this.clients = clients;
  232.       this.flushCommits = flushCommits;
  233.       this.writeToWAL = writeToWAL;
  234.     }
  235.  
  236.     @Override
  237.     public void readFields(DataInput in) throws IOException {
  238.       this.startRow = in.readInt();
  239.       this.rows = in.readInt();
  240.       this.totalRows = in.readInt();
  241.       this.clients = in.readInt();
  242.       this.flushCommits = in.readBoolean();
  243.       this.writeToWAL = in.readBoolean();
  244.     }
  245.  
  246.     @Override
  247.     public void write(DataOutput out) throws IOException {
  248.       out.writeInt(startRow);
  249.       out.writeInt(rows);
  250.       out.writeInt(totalRows);
  251.       out.writeInt(clients);
  252.       out.writeBoolean(flushCommits);
  253.       out.writeBoolean(writeToWAL);
  254.     }
  255.  
  256.     @Override
  257.     public long getLength() throws IOException, InterruptedException {
  258.       return 0;
  259.     }
  260.  
  261.     @Override
  262.     public String[] getLocations() throws IOException, InterruptedException {
  263.       return new String[0];
  264.     }
  265.  
  266.     public int getStartRow() {
  267.       return startRow;
  268.     }
  269.  
  270.     public int getRows() {
  271.       return rows;
  272.     }
  273.  
  274.     public int getTotalRows() {
  275.       return totalRows;
  276.     }
  277.  
  278.     public int getClients() {
  279.       return clients;
  280.     }
  281.  
  282.     public boolean isFlushCommits() {
  283.       return flushCommits;
  284.     }
  285.  
  286.     public boolean isWriteToWAL() {
  287.       return writeToWAL;
  288.     }
  289.   }
  290.  
  291.   /**
  292.    *  InputFormat of Performance Evaluation MapReduce job.
  293.    *  It extends from FileInputFormat, want to use it's methods such as setInputPaths().
  294.    */
  295.   public static class PeInputFormat extends FileInputFormat<NullWritable, PeInputSplit> {
  296.  
  297.     @Override
  298.     public List<InputSplit> getSplits(JobContext job) throws IOException {
  299.       // generate splits
  300.       List<InputSplit> splitList = new ArrayList<InputSplit>();
  301.  
  302.       for (FileStatus file: listStatus(job)) {
  303.         Path path = file.getPath();
  304.         FileSystem fs = path.getFileSystem(job.getConfiguration());
  305.         FSDataInputStream fileIn = fs.open(path);
  306.         LineReader in = new LineReader(fileIn, job.getConfiguration());
  307.         int lineLen = 0;
  308.         while(true) {
  309.           Text lineText = new Text();
  310.           lineLen = in.readLine(lineText);
  311.           if(lineLen <= 0) {
  312.           break;
  313.           }
  314.           Matcher m = LINE_PATTERN.matcher(lineText.toString());
  315.           if((m != null) && m.matches()) {
  316.             int startRow = Integer.parseInt(m.group(1));
  317.             int rows = Integer.parseInt(m.group(2));
  318.             int totalRows = Integer.parseInt(m.group(3));
  319.             int clients = Integer.parseInt(m.group(4));
  320.             boolean flushCommits = Boolean.parseBoolean(m.group(5));
  321.             boolean writeToWAL = Boolean.parseBoolean(m.group(6));
  322.  
  323.             LOG.debug("split["+ splitList.size() + "] " +
  324.                      " startRow=" + startRow +
  325.                      " rows=" + rows +
  326.                      " totalRows=" + totalRows +
  327.                      " clients=" + clients +
  328.                      " flushCommits=" + flushCommits +
  329.                      " writeToWAL=" + writeToWAL);
  330.  
  331.             PeInputSplit newSplit =
  332.               new PeInputSplit(startRow, rows, totalRows, clients,
  333.                 flushCommits, writeToWAL);
  334.             splitList.add(newSplit);
  335.           }
  336.         }
  337.         in.close();
  338.       }
  339.  
  340.       LOG.info("Total # of splits: " + splitList.size());
  341.       return splitList;
  342.     }
  343.  
  344.     @Override
  345.     public RecordReader<NullWritable, PeInputSplit> createRecordReader(InputSplit split,
  346.                             TaskAttemptContext context) {
  347.       return new PeRecordReader();
  348.     }
  349.  
  350.     public static class PeRecordReader extends RecordReader<NullWritable, PeInputSplit> {
  351.       private boolean readOver = false;
  352.       private PeInputSplit split = null;
  353.       private NullWritable key = null;
  354.       private PeInputSplit value = null;
  355.  
  356.       @Override
  357.       public void initialize(InputSplit split, TaskAttemptContext context)
  358.                   throws IOException, InterruptedException {
  359.         this.readOver = false;
  360.         this.split = (PeInputSplit)split;
  361.       }
  362.  
  363.       @Override
  364.       public boolean nextKeyValue() throws IOException, InterruptedException {
  365.         if(readOver) {
  366.           return false;
  367.         }
  368.  
  369.         key = NullWritable.get();
  370.         value = (PeInputSplit)split;
  371.  
  372.         readOver = true;
  373.         return true;
  374.       }
  375.  
  376.       @Override
  377.       public NullWritable getCurrentKey() throws IOException, InterruptedException {
  378.         return key;
  379.       }
  380.  
  381.       @Override
  382.       public PeInputSplit getCurrentValue() throws IOException, InterruptedException {
  383.         return value;
  384.       }
  385.  
  386.       @Override
  387.       public float getProgress() throws IOException, InterruptedException {
  388.         if(readOver) {
  389.           return 1.0f;
  390.         } else {
  391.           return 0.0f;
  392.         }
  393.       }
  394.  
  395.       @Override
  396.       public void close() throws IOException {
  397.         // do nothing
  398.       }
  399.     }
  400.   }
  401.  
  402.   /**
  403.    * MapReduce job that runs a performance evaluation client in each map task.
  404.    */
  405.   public static class EvaluationMapTask
  406.       extends Mapper<NullWritable, PeInputSplit, LongWritable, LongWritable> {
  407.  
  408.     /** configuration parameter name that contains the command */
  409.     public final static String CMD_KEY = "EvaluationMapTask.command";
  410.     /** configuration parameter name that contains the PE impl */
  411.     public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
  412.  
  413.     private Class<? extends Test> cmd;
  414.     private PerformanceEvaluation2 pe;
  415.  
  416.     @Override
  417.     protected void setup(Context context) throws IOException, InterruptedException {
  418.       this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
  419.  
  420.       // this is required so that extensions of PE are instantiated within the
  421.       // map reduce task...
  422.       Class<? extends PerformanceEvaluation2> peClass =
  423.           forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation2.class);
  424.       try {
  425.         this.pe = peClass.getConstructor(Configuration.class)
  426.             .newInstance(context.getConfiguration());
  427.       } catch (Exception e) {
  428.         throw new IllegalStateException("Could not instantiate PE instance", e);
  429.       }
  430.     }
  431.  
  432.     private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
  433.       Class<? extends Type> clazz = null;
  434.       try {
  435.         clazz = Class.forName(className).asSubclass(type);
  436.       } catch (ClassNotFoundException e) {
  437.         throw new IllegalStateException("Could not find class for name: " + className, e);
  438.       }
  439.       return clazz;
  440.     }
  441.  
  442.     protected void map(NullWritable key, PeInputSplit value, final Context context)
  443.            throws IOException, InterruptedException {
  444.  
  445.       Status status = new Status() {
  446.         public void setStatus(String msg) {
  447.            context.setStatus(msg);
  448.         }
  449.       };
  450.  
  451.       // Evaluation task
  452.       long elapsedTime = this.pe.runOneClient(this.cmd, value.getStartRow(),
  453.                                   value.getRows(), value.getTotalRows(),
  454.                                   value.isFlushCommits(), value.isWriteToWAL(),
  455.                                   status);
  456.       // Collect how much time the thing took. Report as map output and
  457.       // to the ELAPSED_TIME counter.
  458.       context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime);
  459.       context.getCounter(Counter.ROWS).increment(value.rows);
  460.       context.write(new LongWritable(value.startRow), new LongWritable(elapsedTime));
  461.       context.progress();
  462.     }
  463.   }
  464.  
  465.   /*
  466.    * If table does not already exist, create.
  467.    * @param c Client to use checking.
  468.    * @return True if we created the table.
  469.    * @throws IOException
  470.    */
  471.   private boolean checkTable(HBaseAdmin admin) throws IOException {
  472.    
  473.     HTableDescriptor tableDescriptor = getTableDescriptor();
  474.     if (this.presplitRegions > 0) {
  475.       // presplit requested
  476.       if (admin.tableExists(tableDescriptor.getName())) {
  477.         admin.disableTable(tableDescriptor.getName());
  478.         admin.deleteTable(tableDescriptor.getName());
  479.         try {
  480.           Thread.sleep(3000);
  481.         } catch (InterruptedException e) {
  482.         }
  483.       }
  484.      
  485.       byte[][] splits = getSplits();
  486.       LOG.info ("Creating tables with splits : ");
  487.       for (int i=0; i < splits.length; i++)
  488.       {
  489.         LOG.info(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
  490.       }
  491.       admin.createTable(tableDescriptor, splits);
  492.       boolean tableExists = admin.tableExists(tableDescriptor.getName());
  493.       return tableExists;
  494.     }
  495.     else {
  496.       boolean tableExists = admin.tableExists(tableDescriptor.getName());
  497.       if (!tableExists) {
  498.         admin.createTable(tableDescriptor);
  499.         LOG.info("Table " + tableDescriptor + " created");
  500.       }
  501.       return !tableExists;
  502.     }
  503.   }
  504.  
  505.   protected HTableDescriptor getTableDescriptor() {
  506.     return TABLE_DESCRIPTOR;
  507.   }
  508.  
  509.   public  byte[][] getSplits() {
  510.     byte[][] splits = new byte[this.presplitRegions - 1][];
  511.     int jump = (this.endRow - this.startRow)  / (this.presplitRegions+1);
  512.     for (int i=0; i < this.presplitRegions-1; i++)
  513.     {
  514.       int rowkey = this.startRow + jump * i;
  515.       splits[i] = format(rowkey);
  516.     }
  517.     return splits;
  518.   }
  519.  
  520.  
  521.  
  522.   /*
  523.    * We're to run multiple clients concurrently.  Setup a mapreduce job.  Run
  524.    * one map per client.  Then run a single reduce to sum the elapsed times.
  525.    * @param cmd Command to run.
  526.    * @throws IOException
  527.    */
  528.   private void runNIsMoreThanOne(final Class<? extends Test> cmd)
  529.   throws IOException, InterruptedException, ClassNotFoundException {
  530.     checkTable(new HBaseAdmin(conf));
  531.     if (this.nomapred) {
  532.       doMultipleClients(cmd);
  533.     } else {
  534.       doMapReduce(cmd);
  535.     }
  536.   }
  537.  
  538.   /*
  539.    * Run all clients in this vm each to its own thread.
  540.    * @param cmd Command to run.
  541.    * @throws IOException
  542.    */
  543.   private void doMultipleClients(final Class<? extends Test> cmd) throws IOException {
  544.     final List<Thread> threads = new ArrayList<Thread>(this.N);
  545.     final int perClientRows = R/N;
  546.     for (int i = 0; i < this.N; i++) {
  547.       Thread t = new Thread (Integer.toString(i)) {
  548.         @Override
  549.         public void run() {
  550.           super.run();
  551.           PerformanceEvaluation2 pe = new PerformanceEvaluation2(conf);
  552.           int index = Integer.parseInt(getName());
  553.           try {
  554.             long elapsedTime = pe.runOneClient(cmd, index * perClientRows,
  555.                perClientRows, R,
  556.                 flushCommits, writeToWAL, new Status() {
  557.                   public void setStatus(final String msg) throws IOException {
  558.                     LOG.info("client-" + getName() + " " + msg);
  559.                   }
  560.                 });
  561.             LOG.info("Finished " + getName() + " in " + elapsedTime +
  562.               "ms writing " + perClientRows + " rows");
  563.           } catch (IOException e) {
  564.             throw new RuntimeException(e);
  565.           }
  566.         }
  567.       };
  568.       threads.add(t);
  569.     }
  570.     for (Thread t: threads) {
  571.       t.start();
  572.     }
  573.     for (Thread t: threads) {
  574.       while(t.isAlive()) {
  575.         try {
  576.           t.join();
  577.         } catch (InterruptedException e) {
  578.           LOG.debug("Interrupted, continuing" + e.toString());
  579.         }
  580.       }
  581.     }
  582.   }
  583.  
  584.   /*
  585.    * Run a mapreduce job.  Run as many maps as asked-for clients.
  586.    * Before we start up the job, write out an input file with instruction
  587.    * per client regards which row they are to start on.
  588.    * @param cmd Command to run.
  589.    * @throws IOException
  590.    */
  591.   private void doMapReduce(final Class<? extends Test> cmd) throws IOException,
  592.         InterruptedException, ClassNotFoundException {
  593.     Path inputDir = writeInputFile(this.conf);
  594.     this.conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
  595.     this.conf.set(EvaluationMapTask.PE_KEY, getClass().getName());
  596.     Job job = new Job(this.conf);
  597.     job.setJarByClass(PerformanceEvaluation2.class);
  598.     job.setJobName("HBase Performance Evaluation");
  599.  
  600.     job.setInputFormatClass(PeInputFormat.class);
  601.     PeInputFormat.setInputPaths(job, inputDir);
  602.  
  603.     job.setOutputKeyClass(LongWritable.class);
  604.     job.setOutputValueClass(LongWritable.class);
  605.  
  606.     job.setMapperClass(EvaluationMapTask.class);
  607.     job.setReducerClass(LongSumReducer.class);
  608.  
  609.     job.setNumReduceTasks(1);
  610.  
  611.     job.setOutputFormatClass(TextOutputFormat.class);
  612.     TextOutputFormat.setOutputPath(job, new Path(inputDir,"outputs"));
  613.  
  614.     TableMapReduceUtil.addDependencyJars(job);
  615.     job.waitForCompletion(true);
  616.   }
  617.  
  618.   /*
  619.    * Write input file of offsets-per-client for the mapreduce job.
  620.    * @param c Configuration
  621.    * @return Directory that contains file written.
  622.    * @throws IOException
  623.    */
  624.   private Path writeInputFile(final Configuration c) throws IOException {
  625.     FileSystem fs = FileSystem.get(c);
  626.     if (!fs.exists(PERF_EVAL_DIR)) {
  627.       fs.mkdirs(PERF_EVAL_DIR);
  628.     }
  629.     SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
  630.     Path subdir = new Path(PERF_EVAL_DIR, formatter.format(new Date()));
  631.     fs.mkdirs(subdir);
  632.     Path inputFile = new Path(subdir, "input.txt");
  633.     PrintStream out = new PrintStream(fs.create(inputFile));
  634.     // Make input random.
  635.     Map<Integer, String> m = new TreeMap<Integer, String>();
  636.     Hash h = MurmurHash.getInstance();
  637.     int perClientRows = (this.R / this.N);
  638.     try {
  639.       for (int i = 0; i < 10; i++) {
  640.         for (int j = 0; j < N; j++) {
  641.           String s = "startRow=" + ((j * perClientRows) + (i * (perClientRows/10))) +
  642.           ", perClientRunRows=" + (perClientRows / 10) +
  643.           ", totalRows=" + this.R +
  644.           ", clients=" + this.N +
  645.           ", flushCommits=" + this.flushCommits +
  646.           ", writeToWAL=" + this.writeToWAL;
  647.           int hash = h.hash(Bytes.toBytes(s));
  648.           m.put(hash, s);
  649.         }
  650.       }
  651.       for (Map.Entry<Integer, String> e: m.entrySet()) {
  652.         out.println(e.getValue());
  653.       }
  654.     } finally {
  655.       out.close();
  656.     }
  657.     return subdir;
  658.   }
  659.  
  660.   /**
  661.    * Describes a command.
  662.    */
  663.   static class CmdDescriptor {
  664.     private Class<? extends Test> cmdClass;
  665.     private String name;
  666.     private String description;
  667.  
  668.     CmdDescriptor(Class<? extends Test> cmdClass, String name, String description) {
  669.       this.cmdClass = cmdClass;
  670.       this.name = name;
  671.       this.description = description;
  672.     }
  673.  
  674.     public Class<? extends Test> getCmdClass() {
  675.       return cmdClass;
  676.     }
  677.  
  678.     public String getName() {
  679.       return name;
  680.     }
  681.  
  682.     public String getDescription() {
  683.       return description;
  684.     }
  685.   }
  686.  
  687.   /**
  688.    * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation2.Test
  689.    * tests}.  This makes the reflection logic a little easier to understand...
  690.    */
  691.   static class TestOptions {
  692.     private int startRow;
  693.     private int perClientRunRows;
  694.     private int totalRows;
  695.     private byte[] tableName;
  696.     private boolean flushCommits;
  697.     private boolean writeToWAL = true;
  698.  
  699.     TestOptions() {
  700.     }
  701.  
  702.     TestOptions(int startRow, int perClientRunRows, int totalRows, byte[] tableName, boolean flushCommits, boolean writeToWAL) {
  703.       this.startRow = startRow;
  704.       this.perClientRunRows = perClientRunRows;
  705.       this.totalRows = totalRows;
  706.       this.tableName = tableName;
  707.       this.flushCommits = flushCommits;
  708.       this.writeToWAL = writeToWAL;
  709.     }
  710.  
  711.     public int getStartRow() {
  712.       return startRow;
  713.     }
  714.  
  715.     public int getPerClientRunRows() {
  716.       return perClientRunRows;
  717.     }
  718.  
  719.     public int getTotalRows() {
  720.       return totalRows;
  721.     }
  722.  
  723.     public byte[] getTableName() {
  724.       return tableName;
  725.     }
  726.  
  727.     public boolean isFlushCommits() {
  728.       return flushCommits;
  729.     }
  730.  
  731.     public boolean isWriteToWAL() {
  732.       return writeToWAL;
  733.     }
  734.   }
  735.  
  736.   /*
  737.    * A test.
  738.    * Subclass to particularize what happens per row.
  739.    */
  740.   static abstract class Test {
  741.     // Below is make it so when Tests are all running in the one
  742.     // jvm, that they each have a differently seeded Random.
  743.     private static final Random randomSeed =
  744.       new Random(System.currentTimeMillis());
  745.     private static long nextRandomSeed() {
  746.       return randomSeed.nextLong();
  747.     }
  748.     protected final Random rand = new Random(nextRandomSeed());
  749.  
  750.     protected final int startRow;
  751.     protected final int perClientRunRows;
  752.     protected final int totalRows;
  753.     private final Status status;
  754.     protected byte[] tableName;
  755.     protected HBaseAdmin admin;
  756.     protected HTable table;
  757.     protected volatile Configuration conf;
  758.     protected boolean flushCommits;
  759.     protected boolean writeToWAL;
  760.  
  761.     /**
  762.      * Note that all subclasses of this class must provide a public contructor
  763.      * that has the exact same list of arguments.
  764.      */
  765.     Test(final Configuration conf, final TestOptions options, final Status status) {
  766.       super();
  767.       this.startRow = options.getStartRow();
  768.       this.perClientRunRows = options.getPerClientRunRows();
  769.       this.totalRows = options.getTotalRows();
  770.       this.status = status;
  771.       this.tableName = options.getTableName();
  772.       this.table = null;
  773.       this.conf = conf;
  774.       this.flushCommits = options.isFlushCommits();
  775.       this.writeToWAL = options.isWriteToWAL();
  776.     }
  777.  
  778.     private String generateStatus(final int sr, final int i, final int lr) {
  779.       return sr + "/" + i + "/" + lr;
  780.     }
  781.  
  782.     protected int getReportingPeriod() {
  783.       int period = this.perClientRunRows / 10;
  784.       return period == 0? this.perClientRunRows: period;
  785.     }
  786.  
  787.     void testSetup() throws IOException {
  788.       this.admin = new HBaseAdmin(conf);
  789.       this.table = new HTable(conf, tableName);
  790.       this.table.setAutoFlush(false);
  791.       this.table.setScannerCaching(30);
  792.     }
  793.  
  794.     void testTakedown()  throws IOException {
  795.       if (flushCommits) {
  796.         this.table.flushCommits();
  797.       }
  798.     }
  799.  
  800.     /*
  801.      * Run test
  802.      * @return Elapsed time.
  803.      * @throws IOException
  804.      */
  805.     long test() throws IOException {
  806.       long elapsedTime;
  807.       testSetup();
  808.       long startTime = System.currentTimeMillis();
  809.       try {
  810.         testTimed();
  811.         elapsedTime = System.currentTimeMillis() - startTime;
  812.       } finally {
  813.         testTakedown();
  814.       }
  815.       return elapsedTime;
  816.     }
  817.  
  818.     /**
  819.      * Provides an extension point for tests that don't want a per row invocation.
  820.      */
  821.     void testTimed() throws IOException {
  822.       int lastRow = this.startRow + this.perClientRunRows;
  823.       // Report on completion of 1/10th of total.
  824.       for (int i = this.startRow; i < lastRow; i++) {
  825.         testRow(i);
  826.         if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
  827.           status.setStatus(generateStatus(this.startRow, i, lastRow));
  828.         }
  829.       }
  830.     }
  831.  
  832.     /*
  833.     * Test for individual row.
  834.     * @param i Row index.
  835.     */
  836.     void testRow(final int i) throws IOException {
  837.     }
  838.   }
  839.  
  840.   @SuppressWarnings("unused")
  841.   static class RandomSeekScanTest extends Test {
  842.     RandomSeekScanTest(Configuration conf, TestOptions options, Status status) {
  843.       super(conf, options, status);
  844.     }
  845.  
  846.     @Override
  847.     void testRow(final int i) throws IOException {
  848.       Scan scan = new Scan(getRandomRow(this.rand, this.totalRows));
  849.       scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
  850.       scan.setFilter(new WhileMatchFilter(new PageFilter(120)));
  851.       ResultScanner s = this.table.getScanner(scan);
  852.       //int count = 0;
  853.       for (Result rr = null; (rr = s.next()) != null;) {
  854.         // LOG.info("" + count++ + " " + rr.toString());
  855.       }
  856.       s.close();
  857.     }
  858.  
  859.     @Override
  860.     protected int getReportingPeriod() {
  861.       int period = this.perClientRunRows / 100;
  862.       return period == 0? this.perClientRunRows: period;
  863.     }
  864.  
  865.   }
  866.  
  867.   @SuppressWarnings("unused")
  868.   static abstract class RandomScanWithRangeTest extends Test {
  869.     RandomScanWithRangeTest(Configuration conf, TestOptions options, Status status) {
  870.       super(conf, options, status);
  871.     }
  872.  
  873.     @Override
  874.     void testRow(final int i) throws IOException {
  875.       Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
  876.       Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond());
  877.       scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
  878.       ResultScanner s = this.table.getScanner(scan);
  879.       int count = 0;
  880.       for (Result rr = null; (rr = s.next()) != null;) {
  881.         count++;
  882.       }
  883.  
  884.       if (i % 100 == 0) {
  885.         LOG.info(String.format("Scan for key range %s - %s returned %s rows",
  886.             Bytes.toString(startAndStopRow.getFirst()),
  887.             Bytes.toString(startAndStopRow.getSecond()), count));
  888.       }
  889.  
  890.       s.close();
  891.     }
  892.  
  893.     protected abstract Pair<byte[],byte[]> getStartAndStopRow();
  894.  
  895.     protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
  896.       int start = this.rand.nextInt(Integer.MAX_VALUE) % totalRows;
  897.       int stop = start + maxRange;
  898.       return new Pair<byte[],byte[]>(format(start), format(stop));
  899.     }
  900.  
  901.     @Override
  902.     protected int getReportingPeriod() {
  903.       int period = this.perClientRunRows / 100;
  904.       return period == 0? this.perClientRunRows: period;
  905.     }
  906.   }
  907.  
  908.   static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
  909.     RandomScanWithRange10Test(Configuration conf, TestOptions options, Status status) {
  910.       super(conf, options, status);
  911.     }
  912.  
  913.     @Override
  914.     protected Pair<byte[], byte[]> getStartAndStopRow() {
  915.       return generateStartAndStopRows(10);
  916.     }
  917.   }
  918.  
  919.   static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
  920.     RandomScanWithRange100Test(Configuration conf, TestOptions options, Status status) {
  921.       super(conf, options, status);
  922.     }
  923.  
  924.     @Override
  925.     protected Pair<byte[], byte[]> getStartAndStopRow() {
  926.       return generateStartAndStopRows(100);
  927.     }
  928.   }
  929.  
  930.   static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
  931.     RandomScanWithRange1000Test(Configuration conf, TestOptions options, Status status) {
  932.       super(conf, options, status);
  933.     }
  934.  
  935.     @Override
  936.     protected Pair<byte[], byte[]> getStartAndStopRow() {
  937.       return generateStartAndStopRows(1000);
  938.     }
  939.   }
  940.  
  941.   static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
  942.     RandomScanWithRange10000Test(Configuration conf, TestOptions options, Status status) {
  943.       super(conf, options, status);
  944.     }
  945.  
  946.     @Override
  947.     protected Pair<byte[], byte[]> getStartAndStopRow() {
  948.       return generateStartAndStopRows(10000);
  949.     }
  950.   }
  951.  
  952.   static class RandomReadTest extends Test {
  953.     RandomReadTest(Configuration conf, TestOptions options, Status status) {
  954.       super(conf, options, status);
  955.     }
  956.  
  957.     @Override
  958.     void testRow(final int i) throws IOException {
  959.       Get get = new Get(getRandomRow(this.rand, this.totalRows));
  960.       get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
  961.       this.table.get(get);
  962.     }
  963.  
  964.     @Override
  965.     protected int getReportingPeriod() {
  966.       int period = this.perClientRunRows / 100;
  967.       return period == 0? this.perClientRunRows: period;
  968.     }
  969.  
  970.   }
  971.  
  972.   static class RandomWriteTest extends Test {
  973.     RandomWriteTest(Configuration conf, TestOptions options, Status status) {
  974.       super(conf, options, status);
  975.     }
  976.  
  977.     @Override
  978.     void testRow(final int i) throws IOException {
  979.       byte [] row = getRandomRow(this.rand, this.totalRows);
  980.       Put put = new Put(row);
  981.       byte[] value = generateValue(this.rand);
  982.       put.add(FAMILY_NAME, QUALIFIER_NAME, value);
  983.       put.setWriteToWAL(writeToWAL);
  984.       table.put(put);
  985.     }
  986.   }
  987.  
  988.   static class ScanTest extends Test {
  989.     private ResultScanner testScanner;
  990.  
  991.     ScanTest(Configuration conf, TestOptions options, Status status) {
  992.       super(conf, options, status);
  993.     }
  994.  
  995.     @Override
  996.     void testSetup() throws IOException {
  997.       super.testSetup();
  998.     }
  999.  
  1000.     @Override
  1001.     void testTakedown() throws IOException {
  1002.       if (this.testScanner != null) {
  1003.         this.testScanner.close();
  1004.       }
  1005.       super.testTakedown();
  1006.     }
  1007.  
  1008.  
  1009.     @Override
  1010.     void testRow(final int i) throws IOException {
  1011.       if (this.testScanner == null) {
  1012.         Scan scan = new Scan(format(this.startRow));
  1013.         scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
  1014.         this.testScanner = table.getScanner(scan);
  1015.       }
  1016.       testScanner.next();
  1017.     }
  1018.  
  1019.   }
  1020.  
  1021.   static class SequentialReadTest extends Test {
  1022.     SequentialReadTest(Configuration conf, TestOptions options, Status status) {
  1023.       super(conf, options, status);
  1024.     }
  1025.  
  1026.     @Override
  1027.     void testRow(final int i) throws IOException {
  1028.       Get get = new Get(format(i));
  1029.       get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
  1030.       table.get(get);
  1031.     }
  1032.  
  1033.   }
  1034.  
  1035.   static class SequentialWriteTest extends Test {
  1036.     SequentialWriteTest(Configuration conf, TestOptions options, Status status) {
  1037.       super(conf, options, status);
  1038.     }
  1039.  
  1040.     @Override
  1041.     void testRow(final int i) throws IOException {
  1042.       Put put = new Put(format(i));
  1043.       byte[] value = generateValue(this.rand);
  1044.       put.add(FAMILY_NAME, QUALIFIER_NAME, value);
  1045.       put.setWriteToWAL(writeToWAL);
  1046.       table.put(put);
  1047.     }
  1048.  
  1049.   }
  1050.  
  1051.   static class FilteredScanTest extends Test {
  1052.     protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName());
  1053.  
  1054.     FilteredScanTest(Configuration conf, TestOptions options, Status status) {
  1055.       super(conf, options, status);
  1056.     }
  1057.  
  1058.     @Override
  1059.     void testRow(int i) throws IOException {
  1060.       byte[] value = generateValue(this.rand);
  1061.       Scan scan = constructScan(value);
  1062.       ResultScanner scanner = null;
  1063.       try {
  1064.         scanner = this.table.getScanner(scan);
  1065.         while (scanner.next() != null) {
  1066.         }
  1067.       } finally {
  1068.         if (scanner != null) scanner.close();
  1069.       }
  1070.     }
  1071.  
  1072.     protected Scan constructScan(byte[] valuePrefix) throws IOException {
  1073.       Filter filter = new SingleColumnValueFilter(
  1074.           FAMILY_NAME, QUALIFIER_NAME, CompareFilter.CompareOp.EQUAL,
  1075.           new BinaryComparator(valuePrefix)
  1076.       );
  1077.       Scan scan = new Scan();
  1078.       scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
  1079.       scan.setFilter(filter);
  1080.       return scan;
  1081.     }
  1082.   }
  1083.  
  1084.   /*
  1085.    * Format passed integer.
  1086.    * @param number
  1087.    * @return Returns zero-prefixed 10-byte wide decimal version of passed
  1088.    * number (Does absolute in case number is negative).
  1089.    */
  1090.   public static byte [] format(final int number) {
  1091.     byte [] b = new byte[10];
  1092.     int d = Math.abs(number);
  1093.     for (int i = b.length - 1; i >= 0; i--) {
  1094.       b[i] = (byte)((d % 10) + '0');
  1095.       d /= 10;
  1096.     }
  1097.     return b;
  1098.   }
  1099.  
  1100.   /*
  1101.    * This method takes some time and is done inline uploading data.  For
  1102.    * example, doing the mapfile test, generation of the key and value
  1103.    * consumes about 30% of CPU time.
  1104.    * @return Generated random value to insert into a table cell.
  1105.    */
  1106.   public static byte[] generateValue(final Random r) {
  1107.     byte [] b = new byte [ROW_LENGTH];
  1108.     r.nextBytes(b);
  1109.     return b;
  1110.   }
  1111.  
  1112.   static byte [] getRandomRow(final Random random, final int totalRows) {
  1113.     return format(random.nextInt(Integer.MAX_VALUE) % totalRows);
  1114.   }
  1115.  
  1116.   long runOneClient(final Class<? extends Test> cmd, final int startRow,
  1117.                     final int perClientRunRows, final int totalRows,
  1118.                     boolean flushCommits, boolean writeToWAL,
  1119.                     final Status status)
  1120.   throws IOException {
  1121.     status.setStatus("Start " + cmd + " at offset " + startRow + " for " +
  1122.       perClientRunRows + " rows");
  1123.     long totalElapsedTime = 0;
  1124.  
  1125.     Test t = null;
  1126.     TestOptions options = new TestOptions(startRow, perClientRunRows,
  1127.         totalRows, getTableDescriptor().getName(), flushCommits, writeToWAL);
  1128.     try {
  1129.       Constructor<? extends Test> constructor = cmd.getDeclaredConstructor(
  1130.           Configuration.class, TestOptions.class, Status.class);
  1131.       t = constructor.newInstance(this.conf, options, status);
  1132.     } catch (NoSuchMethodException e) {
  1133.       throw new IllegalArgumentException("Invalid command class: " +
  1134.           cmd.getName() + ".  It does not provide a constructor as described by" +
  1135.           "the javadoc comment.  Available constructors are: " +
  1136.           Arrays.toString(cmd.getConstructors()));
  1137.     } catch (Exception e) {
  1138.       throw new IllegalStateException("Failed to construct command class", e);
  1139.     }
  1140.     totalElapsedTime = t.test();
  1141.  
  1142.     status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
  1143.       "ms at offset " + startRow + " for " + perClientRunRows + " rows");
  1144.     return totalElapsedTime;
  1145.   }
  1146.  
  1147.   private void runNIsOne(final Class<? extends Test> cmd) {
  1148.     Status status = new Status() {
  1149.       public void setStatus(String msg) throws IOException {
  1150.         LOG.info(msg);
  1151.       }
  1152.     };
  1153.  
  1154.     HBaseAdmin admin = null;
  1155.     try {
  1156.       admin = new HBaseAdmin(this.conf);
  1157.       checkTable(admin);
  1158.       runOneClient(cmd, 0, this.R, this.R, this.flushCommits, this.writeToWAL,
  1159.         status);
  1160.     } catch (Exception e) {
  1161.       LOG.error("Failed", e);
  1162.     }
  1163.   }
  1164.  
  1165.   private void runTest(final Class<? extends Test> cmd) throws IOException,
  1166.           InterruptedException, ClassNotFoundException {
  1167.     MiniHBaseCluster hbaseMiniCluster = null;
  1168.     MiniDFSCluster dfsCluster = null;
  1169.     MiniZooKeeperCluster zooKeeperCluster = null;
  1170.     if (this.miniCluster) {
  1171.       dfsCluster = new MiniDFSCluster(conf, 2, true, (String[])null);
  1172.       zooKeeperCluster = new MiniZooKeeperCluster();
  1173.       int zooKeeperPort = zooKeeperCluster.startup(new File(System.getProperty("java.io.tmpdir")));
  1174.  
  1175.       // mangle the conf so that the fs parameter points to the minidfs we
  1176.       // just started up
  1177.       FileSystem fs = dfsCluster.getFileSystem();
  1178.       conf.set("fs.default.name", fs.getUri().toString());
  1179.       conf.set("hbase.zookeeper.property.clientPort", Integer.toString(zooKeeperPort));
  1180.       Path parentdir = fs.getHomeDirectory();
  1181.       conf.set(HConstants.HBASE_DIR, parentdir.toString());
  1182.       fs.mkdirs(parentdir);
  1183.       FSUtils.setVersion(fs, parentdir);
  1184.       hbaseMiniCluster = new MiniHBaseCluster(this.conf, N);
  1185.     }
  1186.  
  1187.     try {
  1188.       if (N == 1) {
  1189.         // If there is only one client and one HRegionServer, we assume nothing
  1190.         // has been set up at all.
  1191.         runNIsOne(cmd);
  1192.       } else {
  1193.         // Else, run
  1194.         runNIsMoreThanOne(cmd);
  1195.       }
  1196.     } finally {
  1197.       if(this.miniCluster) {
  1198.         if (hbaseMiniCluster != null) hbaseMiniCluster.shutdown();
  1199.         if (zooKeeperCluster != null) zooKeeperCluster.shutdown();
  1200.         HBaseTestCase.shutdownDfs(dfsCluster);
  1201.       }
  1202.     }
  1203.   }
  1204.  
  1205.   protected void printUsage() {
  1206.     printUsage(null);
  1207.   }
  1208.  
  1209.   protected void printUsage(final String message) {
  1210.     if (message != null && message.length() > 0) {
  1211.       System.err.println(message);
  1212.     }
  1213.     System.err.println("Usage: java " + this.getClass().getName() + " \\");
  1214.     System.err.println("  [--miniCluster] [--nomapred] [--rows=ROWS] <command> <nclients>");
  1215.     System.err.println();
  1216.     System.err.println("Options:");
  1217.     System.err.println(" miniCluster     Run the test on an HBaseMiniCluster");
  1218.     System.err.println(" nomapred        Run multiple clients using threads " +
  1219.       "(rather than use mapreduce)");
  1220.     System.err.println(" rows            Rows each client runs. Default: One million");
  1221.     System.err.println(" flushCommits    Used to determine if the test should flush the table.  Default: false");
  1222.     System.err.println(" writeToWAL      Set writeToWAL on puts. Default: True");
  1223.     System.err.println();
  1224.     System.err.println("Command:");
  1225.     for (CmdDescriptor command : commands.values()) {
  1226.       System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription()));
  1227.     }
  1228.     System.err.println();
  1229.     System.err.println("Args:");
  1230.     System.err.println(" nclients        Integer. Required. Total number of " +
  1231.       "clients (and HRegionServers)");
  1232.     System.err.println("                 running: 1 <= value <= 500");
  1233.     System.err.println("Examples:");
  1234.     System.err.println(" To run a single evaluation client:");
  1235.     System.err.println(" $ bin/hbase " + this.getClass().getName()
  1236.         + " sequentialWrite 1");
  1237.   }
  1238.  
  1239.   private void getArgs(final int start, final String[] args) {
  1240.     if(start + 1 > args.length) {
  1241.       throw new IllegalArgumentException("must supply the number of clients");
  1242.     }
  1243.     N = Integer.parseInt(args[start]);
  1244.     if (N < 1) {
  1245.       throw new IllegalArgumentException("Number of clients must be > 1");
  1246.     }
  1247.     // Set total number of rows to write.
  1248.     this.R = this.R * N;
  1249.   }
  1250.  
  1251.   public int doCommandLine(final String[] args) {
  1252.     // Process command-line args. TODO: Better cmd-line processing
  1253.     // (but hopefully something not as painful as cli options).
  1254.     int errCode = -1;
  1255.     if (args.length < 1) {
  1256.       printUsage();
  1257.       return errCode;
  1258.     }
  1259.  
  1260.     try {
  1261.       for (int i = 0; i < args.length; i++) {
  1262.         String cmd = args[i];
  1263.         if (cmd.equals("-h") || cmd.startsWith("--h")) {
  1264.           printUsage();
  1265.           errCode = 0;
  1266.           break;
  1267.         }
  1268.  
  1269.         final String miniClusterArgKey = "--miniCluster";
  1270.         if (cmd.startsWith(miniClusterArgKey)) {
  1271.           this.miniCluster = true;
  1272.           continue;
  1273.         }
  1274.  
  1275.         final String nmr = "--nomapred";
  1276.         if (cmd.startsWith(nmr)) {
  1277.           this.nomapred = true;
  1278.           continue;
  1279.         }
  1280.  
  1281.         final String rows = "--rows=";
  1282.         if (cmd.startsWith(rows)) {
  1283.           this.R = Integer.parseInt(cmd.substring(rows.length()));
  1284.           continue;
  1285.         }
  1286.  
  1287.         final String flushCommits = "--flushCommits=";
  1288.         if (cmd.startsWith(flushCommits)) {
  1289.           this.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
  1290.           continue;
  1291.         }
  1292.  
  1293.         final String writeToWAL = "--writeToWAL=";
  1294.         if (cmd.startsWith(writeToWAL)) {
  1295.           this.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
  1296.           continue;
  1297.         }
  1298.        
  1299.         final String presplit = "--presplit=";
  1300.         if (cmd.startsWith(presplit)) {
  1301.           this.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
  1302.           continue;
  1303.         }
  1304.  
  1305.         Class<? extends Test> cmdClass = determineCommandClass(cmd);
  1306.         if (cmdClass != null) {
  1307.           getArgs(i + 1, args);
  1308.           runTest(cmdClass);
  1309.           errCode = 0;
  1310.           break;
  1311.         }
  1312.  
  1313.         printUsage();
  1314.         break;
  1315.       }
  1316.     } catch (Exception e) {
  1317.       e.printStackTrace();
  1318.     }
  1319.  
  1320.     return errCode;
  1321.   }
  1322.  
  1323.   private Class<? extends Test> determineCommandClass(String cmd) {
  1324.     CmdDescriptor descriptor = commands.get(cmd);
  1325.     return descriptor != null ? descriptor.getCmdClass() : null;
  1326.   }
  1327.  
  1328.   /**
  1329.    * @param args
  1330.    */
  1331.   public static void main(final String[] args) {
  1332.     Configuration c = HBaseConfiguration.create();
  1333.     System.exit(new PerformanceEvaluation2(c).doCommandLine(args));
  1334.   }
  1335. }
  1336.  
  1337.  
Advertisement
Add Comment
Please, Sign In to add comment