Don't like ads? PRO users don't see any ads ;-)
Guest

Untitled

By: a guest on Aug 3rd, 2012  |  syntax: None  |  size: 15.95 KB  |  hits: 9  |  expires: Never
download  |  raw  |  embed  |  report abuse  |  print
Text below is selected. Please press Ctrl+C to copy to your clipboard. (⌘+C on Mac)
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18.  
  19. package org.apache.hadoop.hive.ql.index.compact;
  20.  
  21. import java.io.Serializable;
  22. import java.util.ArrayList;
  23. import java.util.Collection;
  24. import java.util.HashSet;
  25. import java.util.LinkedHashMap;
  26. import java.util.List;
  27. import java.util.Set;
  28.  
  29. import org.apache.commons.logging.Log;
  30. import org.apache.commons.logging.LogFactory;
  31. import org.apache.hadoop.conf.Configuration;
  32. import org.apache.hadoop.hive.conf.HiveConf;
  33. import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
  34. import org.apache.hadoop.hive.metastore.api.FieldSchema;
  35. import org.apache.hadoop.hive.metastore.api.Index;
  36. import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
  37. import org.apache.hadoop.hive.metastore.api.Table;
  38. import org.apache.hadoop.hive.ql.Driver;
  39. import org.apache.hadoop.hive.ql.exec.FilterOperator;
  40. import org.apache.hadoop.hive.ql.exec.Operator;
  41. import org.apache.hadoop.hive.ql.exec.Task;
  42. import org.apache.hadoop.hive.ql.hooks.ReadEntity;
  43. import org.apache.hadoop.hive.ql.hooks.WriteEntity;
  44. import org.apache.hadoop.hive.ql.index.HiveIndexQueryContext;
  45. import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
  46. import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
  47. import org.apache.hadoop.hive.ql.index.TableBasedIndexHandler;
  48. import org.apache.hadoop.hive.ql.io.HiveInputFormat;
  49. import org.apache.hadoop.hive.ql.metadata.HiveException;
  50. import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler.DecomposedPredicate;
  51. import org.apache.hadoop.hive.ql.metadata.HiveUtils;
  52. import org.apache.hadoop.hive.ql.metadata.Partition;
  53. import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
  54. import org.apache.hadoop.hive.ql.optimizer.IndexUtils;
  55. import org.apache.hadoop.hive.ql.parse.ParseContext;
  56. import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
  57. import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
  58. import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
  59. import org.apache.hadoop.hive.ql.plan.MapredWork;
  60. import org.apache.hadoop.hive.ql.plan.PartitionDesc;
  61. import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
  62. import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
  63. import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan;
  64. import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
  65. import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
  66. import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotNull;
  67. import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull;
  68.  
  69. public class CompactIndexHandler extends TableBasedIndexHandler {
  70.  
  71.   private Configuration configuration;
  72.   // The names of the partition columns
  73.   private Set<String> partitionCols;
  74.   // Whether or not the conditions have been met to use the fact the index is sorted
  75.   private boolean useSorted;
  76.   private static final Log LOG = LogFactory.getLog(CompactIndexHandler.class.getName());
  77.  
  78.  
  79.   @Override
  80.   public void analyzeIndexDefinition(Table baseTable, Index index,
  81.       Table indexTable) throws HiveException {
  82.     StorageDescriptor storageDesc = index.getSd();
  83.     if (this.usesIndexTable() && indexTable != null) {
  84.       StorageDescriptor indexTableSd = storageDesc.deepCopy();
  85.       List<FieldSchema> indexTblCols = indexTableSd.getCols();
  86.       FieldSchema bucketFileName = new FieldSchema("_bucketname", "string", "");
  87.       indexTblCols.add(bucketFileName);
  88.       FieldSchema offSets = new FieldSchema("_offsets", "array<bigint>", "");
  89.       indexTblCols.add(offSets);
  90.       indexTable.setSd(indexTableSd);
  91.     }
  92.   }
  93.  
  94.   @Override
  95.   protected Task<?> getIndexBuilderMapRedTask(Set<ReadEntity> inputs, Set<WriteEntity> outputs,
  96.       List<FieldSchema> indexField, boolean partitioned,
  97.       PartitionDesc indexTblPartDesc, String indexTableName,
  98.       PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException {
  99.  
  100.     String indexCols = HiveUtils.getUnparsedColumnNamesFromFieldSchema(indexField);
  101.  
  102.     //form a new insert overwrite query.
  103.     StringBuilder command= new StringBuilder();
  104.     LinkedHashMap<String, String> partSpec = indexTblPartDesc.getPartSpec();
  105.  
  106.     command.append("INSERT OVERWRITE TABLE " + HiveUtils.unparseIdentifier(indexTableName ));
  107.     if (partitioned && indexTblPartDesc != null) {
  108.       command.append(" PARTITION ( ");
  109.       List<String> ret = getPartKVPairStringArray(partSpec);
  110.       for (int i = 0; i < ret.size(); i++) {
  111.         String partKV = ret.get(i);
  112.         command.append(partKV);
  113.         if (i < ret.size() - 1) {
  114.           command.append(",");
  115.         }
  116.       }
  117.       command.append(" ) ");
  118.     }
  119.  
  120.     command.append(" SELECT ");
  121.     command.append(indexCols);
  122.     command.append(",");
  123.  
  124.     command.append(VirtualColumn.FILENAME.getName());
  125.     command.append(",");
  126.     command.append(" collect_set (");
  127.     command.append(VirtualColumn.BLOCKOFFSET.getName());
  128.     command.append(") ");
  129.     command.append(" FROM " + HiveUtils.unparseIdentifier(baseTableName) );
  130.     LinkedHashMap<String, String> basePartSpec = baseTablePartDesc.getPartSpec();
  131.     if(basePartSpec != null) {
  132.       command.append(" WHERE ");
  133.       List<String> pkv = getPartKVPairStringArray(basePartSpec);
  134.       for (int i = 0; i < pkv.size(); i++) {
  135.         String partKV = pkv.get(i);
  136.         command.append(partKV);
  137.         if (i < pkv.size() - 1) {
  138.           command.append(" AND ");
  139.         }
  140.       }
  141.     }
  142.     command.append(" GROUP BY ");
  143.     command.append(indexCols + ", " + VirtualColumn.FILENAME.getName());
  144.  
  145.     HiveConf builderConf = new HiveConf(getConf(), CompactIndexHandler.class);
  146.     builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPFILES, false);
  147.     builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES, false);
  148.     Task<?> rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs,
  149.         command, partSpec, indexTableName, dbName);
  150.     return rootTask;
  151.   }
  152.  
  153.   @Override
  154.   public void generateIndexQuery(List<Index> indexes, ExprNodeDesc predicate,
  155.     ParseContext pctx, HiveIndexQueryContext queryContext) {
  156.  
  157.     Index index = indexes.get(0);
  158.     DecomposedPredicate decomposedPredicate = decomposePredicate(predicate, index,
  159.                                                                   queryContext.getQueryPartitions());
  160.  
  161.     if (decomposedPredicate == null) {
  162.       queryContext.setQueryTasks(null);
  163.       return; // abort if we couldn't pull out anything from the predicate
  164.     }
  165.  
  166.     // pass residual predicate back out for further processing
  167.     queryContext.setResidualPredicate(decomposedPredicate.residualPredicate);
  168.     // setup TableScanOperator to change input format for original query
  169.     queryContext.setIndexInputFormat(HiveCompactIndexInputFormat.class.getName());
  170.  
  171.     // Build reentrant QL for index query
  172.     StringBuilder qlCommand = new StringBuilder("INSERT OVERWRITE DIRECTORY ");
  173.  
  174.     String tmpFile = pctx.getContext().getMRTmpFileURI();
  175.     queryContext.setIndexIntermediateFile(tmpFile);
  176.     qlCommand.append( "\"" + tmpFile + "\" "); // QL includes " around file name
  177.     qlCommand.append("SELECT `_bucketname` , `_offsets` FROM ");
  178.     qlCommand.append(HiveUtils.unparseIdentifier(index.getIndexTableName()));
  179.     qlCommand.append(" WHERE ");
  180.  
  181.     String predicateString = decomposedPredicate.pushedPredicate.getExprString();
  182.     qlCommand.append(predicateString);
  183.  
  184.     // generate tasks from index query string
  185.     LOG.info("Generating tasks for re-entrant QL query: " + qlCommand.toString());
  186.     HiveConf queryConf = new HiveConf(pctx.getConf(), CompactIndexHandler.class);
  187.     HiveConf.setBoolVar(queryConf, HiveConf.ConfVars.COMPRESSRESULT, false);
  188.     Driver driver = new Driver(queryConf);
  189.     driver.compile(qlCommand.toString(), false);
  190.  
  191.     if (pctx.getConf().getBoolVar(ConfVars.HIVE_INDEX_COMPACT_BINARY_SEARCH) && useSorted) {
  192.       // For now, only works if the predicate is a single condition
  193.       MapredWork work = null;
  194.       String originalInputFormat = null;
  195.       for (Task task : driver.getPlan().getRootTasks()) {
  196.         // The index query should have one and only one map reduce task in the root tasks
  197.         // Otherwise something is wrong, log the problem and continue using the default format
  198.         if (task.getWork() instanceof MapredWork) {
  199.           if (work != null) {
  200.             LOG.error("Tried to use a binary search on a compact index but there were an " +
  201.                       "unexpected number (>1) of root level map reduce tasks in the " +
  202.                       "reentrant query plan.");
  203.             work.setInputformat(null);
  204.             work.setInputFormatSorted(false);
  205.             break;
  206.           }
  207.           work = (MapredWork)task.getWork();
  208.           String inputFormat = work.getInputformat();
  209.           originalInputFormat = inputFormat;
  210.           if (inputFormat == null) {
  211.             inputFormat = HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVEINPUTFORMAT);
  212.           }
  213.  
  214.           // We can only perform a binary search with HiveInputFormat and CombineHiveInputFormat
  215.           // and BucketizedHiveInputFormat
  216.           try {
  217.             if (!HiveInputFormat.class.isAssignableFrom(Class.forName(inputFormat))) {
  218.               work = null;
  219.               break;
  220.             }
  221.           } catch (ClassNotFoundException e) {
  222.             LOG.error("Map reduce work's input format class: " + inputFormat + " was not found. " +
  223.                        "Cannot use the fact the compact index is sorted.");
  224.             work = null;
  225.             break;
  226.           }
  227.  
  228.           work.setInputFormatSorted(true);
  229.         }
  230.       }
  231.  
  232.       if (work != null) {
  233.         // Find the filter operator and expr node which act on the index column and mark them
  234.         if (!findIndexColumnFilter(work.getAliasToWork().values())) {
  235.           LOG.error("Could not locate the index column's filter operator and expr node. Cannot " +
  236.                     "use the fact the compact index is sorted.");
  237.           work.setInputformat(originalInputFormat);
  238.           work.setInputFormatSorted(false);
  239.         }
  240.       }
  241.     }
  242.  
  243.  
  244.     queryContext.addAdditionalSemanticInputs(driver.getPlan().getInputs());
  245.     queryContext.setQueryTasks(driver.getPlan().getRootTasks());
  246.     return;
  247.   }
  248.  
  249.   /**
  250. * Does a depth first search on the operator tree looking for a filter operator whose predicate
  251. * has one child which is a column which is not in the partition
  252. * @param operators
  253. * @return whether or not it has found its target
  254. */
  255.   private boolean findIndexColumnFilter(Collection<Operator<? extends Serializable>> operators) {
  256.     for (Operator<? extends Serializable> op : operators) {
  257.       if (op instanceof FilterOperator && ((FilterOperator)op).getConf().getPredicate().getChildren() != null) {
  258.         // Is this the target
  259.         if (findIndexColumnExprNodeDesc(((FilterOperator)op).getConf().getPredicate())) {
  260.           ((FilterOperator)op).getConf().setSortedFilter(true);
  261.           return true;
  262.         }
  263.       }
  264.  
  265.       // If the target has been found, no need to continue
  266.       if (findIndexColumnFilter(op.getChildOperators())) {
  267.         return true;
  268.       }
  269.     }
  270.     return false;
  271.   }
  272.  
  273.   private boolean findIndexColumnExprNodeDesc(ExprNodeDesc expression) {
  274.     if (expression.getChildren() == null) {
  275.       return false;
  276.     }
  277.  
  278.     if (expression.getChildren().size() == 2) {
  279.       ExprNodeColumnDesc columnDesc = null;
  280.       if (expression.getChildren().get(0) instanceof ExprNodeColumnDesc) {
  281.         columnDesc = (ExprNodeColumnDesc)expression.getChildren().get(0);
  282.       } else if (expression.getChildren().get(1) instanceof ExprNodeColumnDesc) {
  283.         columnDesc = (ExprNodeColumnDesc)expression.getChildren().get(1);
  284.       }
  285.  
  286.       // Is this the target
  287.       if (columnDesc != null && !partitionCols.contains(columnDesc.getColumn())) {
  288.         assert expression instanceof ExprNodeGenericFuncDesc :
  289.                "Expression containing index column is does not support sorting, should not try" +
  290.                "and sort";
  291.         ((ExprNodeGenericFuncDesc)expression).setSortedExpr(true);
  292.         return true;
  293.       }
  294.     }
  295.  
  296.     for (ExprNodeDesc child : expression.getChildren()) {
  297.       // If the target has been found, no need to continue
  298.       if (findIndexColumnExprNodeDesc(child)) {
  299.         return true;
  300.       }
  301.     }
  302.     return false;
  303.   }
  304.  
  305.   /**
  306. * Split the predicate into the piece we can deal with (pushed), and the one we can't (residual)
  307. * @param predicate
  308. * @param index
  309. * @return
  310. */
  311.   private DecomposedPredicate decomposePredicate(ExprNodeDesc predicate, Index index,
  312.       Set<Partition> queryPartitions) {
  313.     IndexPredicateAnalyzer analyzer = getIndexPredicateAnalyzer(index, queryPartitions);
  314.     List<IndexSearchCondition> searchConditions = new ArrayList<IndexSearchCondition>();
  315.     // split predicate into pushed (what we can handle), and residual (what we can't handle)
  316.     ExprNodeDesc residualPredicate = analyzer.analyzePredicate(predicate, searchConditions);
  317.  
  318.     if (searchConditions.size() == 0) {
  319.       return null;
  320.     }
  321.  
  322.     int numIndexCols = 0;
  323.     for (IndexSearchCondition searchCondition : searchConditions) {
  324.       if (!partitionCols.contains(searchCondition.getColumnDesc().getColumn())) {
  325.         numIndexCols++;
  326.       }
  327.     }
  328.  
  329.     // For now, only works if the predicate has a single condition on an index column
  330.     if (numIndexCols == 1) {
  331.       useSorted = true;
  332.     } else {
  333.       useSorted = false;
  334.     }
  335.  
  336.     DecomposedPredicate decomposedPredicate = new DecomposedPredicate();
  337.     decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions(searchConditions);
  338.     decomposedPredicate.residualPredicate = residualPredicate;
  339.  
  340.     return decomposedPredicate;
  341.   }
  342.  
  343.   /**
  344. * Instantiate a new predicate analyzer suitable for determining
  345. * whether we can use an index, based on rules for indexes in
  346. * WHERE clauses that we support
  347. *
  348. * @return preconfigured predicate analyzer for WHERE queries
  349. */
  350.   private IndexPredicateAnalyzer getIndexPredicateAnalyzer(Index index, Set<Partition> queryPartitions) {
  351.     IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer();
  352.  
  353.     analyzer.addComparisonOp(GenericUDFOPEqual.class.getName());
  354.     analyzer.addComparisonOp(GenericUDFOPLessThan.class.getName());
  355.     analyzer.addComparisonOp(GenericUDFOPEqualOrLessThan.class.getName());
  356.     analyzer.addComparisonOp(GenericUDFOPGreaterThan.class.getName());
  357.     analyzer.addComparisonOp(GenericUDFOPEqualOrGreaterThan.class.getName());
  358.  
  359.     // only return results for columns in this index
  360.     List<FieldSchema> columnSchemas = index.getSd().getCols();
  361.     for (FieldSchema column : columnSchemas) {
  362.       analyzer.allowColumnName(column.getName());
  363.     }
  364.  
  365.     // partitioned columns are treated as if they have indexes so that the partitions
  366.     // are used during the index query generation
  367.     partitionCols = new HashSet<String>();
  368.     for (Partition part : queryPartitions) {
  369.       if (part.getSpec().isEmpty()) {
  370.         continue; // empty partitions are from whole tables, so we don't want to add them in
  371.       }
  372.       for (String column : part.getSpec().keySet()) {
  373.         analyzer.allowColumnName(column);
  374.         partitionCols.add(column);
  375.       }
  376.     }
  377.  
  378.     return analyzer;
  379.   }
  380.  
  381.  
  382.   @Override
  383.   public boolean checkQuerySize(long querySize, HiveConf hiveConf) {
  384.     long minSize = hiveConf.getLongVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER_COMPACT_MINSIZE);
  385.     long maxSize = hiveConf.getLongVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER_COMPACT_MAXSIZE);
  386.     if (maxSize < 0) {
  387.       maxSize = Long.MAX_VALUE;
  388.     }
  389.     return (querySize > minSize & querySize < maxSize);
  390.   }
  391.  
  392.   @Override
  393.   public boolean usesIndexTable() {
  394.     return true;
  395.   }
  396.  
  397. }