Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- scala> sql("SET spark.sql.codegen.aggregate.splitAggregateFunc.enabled=true")
- scala> spark.range(10).selectExpr("id a", "id b", "id c").write.saveAsTable("t")
- scala> import org.apache.spark.sql.execution.debug._
- scala> sql("SELECT SUM(a + b), SUM(a + b + c) FROM t").debugCodegen
- == Subtree 2 / 2 ==
- *(1) HashAggregate(keys=[], functions=[partial_sum((a#21L + b#22L)), partial_sum(((a#21L + b#22L) + c#23L))], output=[sum#63L, sum#64L])
- +- *(1) ColumnarToRow
- +- FileScan parquet default.t[a#21L,b#22L,c#23L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-master/spark-warehouse/t], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint,c:bigint>
- Generated code:
- /* 001 */ public Object generate(Object[] references) {
- /* 002 */ return new GeneratedIteratorForCodegenStage1(references);
- /* 003 */ }
- /* 004 */
- /* 005 */ // codegenStageId=1
- /* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
- /* 007 */ private Object[] references;
- /* 008 */ private scala.collection.Iterator[] inputs;
- /* 009 */ private boolean agg_initAgg_0;
- /* 010 */ private boolean agg_bufIsNull_0;
- /* 011 */ private long agg_bufValue_0;
- /* 012 */ private boolean agg_bufIsNull_1;
- /* 013 */ private long agg_bufValue_1;
- /* 014 */ private int columnartorow_batchIdx_0;
- /* 015 */ private boolean agg_agg_isNull_8_0;
- /* 016 */ private boolean agg_agg_isNull_10_0;
- /* 017 */ private boolean agg_agg_isNull_15_0;
- /* 018 */ private boolean agg_agg_isNull_17_0;
- /* 019 */ private org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[] columnartorow_mutableStateArray_2 = new org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[3];
- /* 020 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] columnartorow_mutableStateArray_3 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[3];
- /* 021 */ private org.apache.spark.sql.vectorized.ColumnarBatch[] columnartorow_mutableStateArray_1 = new org.apache.spark.sql.vectorized.ColumnarBatch[1];
- /* 022 */ private scala.collection.Iterator[] columnartorow_mutableStateArray_0 = new scala.collection.Iterator[1];
- /* 023 */
- /* 024 */ public GeneratedIteratorForCodegenStage1(Object[] references) {
- /* 025 */ this.references = references;
- /* 026 */ }
- /* 027 */
- /* 028 */ public void init(int index, scala.collection.Iterator[] inputs) {
- /* 029 */ partitionIndex = index;
- /* 030 */ this.inputs = inputs;
- /* 031 */
- /* 032 */ columnartorow_mutableStateArray_0[0] = inputs[0];
- /* 033 */ columnartorow_mutableStateArray_3[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(3, 0);
- /* 034 */ columnartorow_mutableStateArray_3[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(3, 0);
- /* 035 */ columnartorow_mutableStateArray_3[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
- /* 036 */
- /* 037 */ }
- /* 038 */
- /* 039 */ private void agg_doAggregateVal_coalesce_0(long agg_expr_0_0, boolean agg_localBufIsNull_0, long agg_localBufValue_0, boolean agg_exprIsNull_0_0, boolean agg_exprIsNull_1_0, long agg_value_5, long agg_expr_1_0, boolean agg_isNull_5) throws java.io.IOException {
- /* 040 */ // evaluate aggregate function
- /* 041 */ agg_agg_isNull_8_0 = true;
- /* 042 */ long agg_value_8 = -1L;
- /* 043 */ do {
- /* 044 */ boolean agg_isNull_9 = true;
- /* 045 */ long agg_value_9 = -1L;
- /* 046 */ agg_agg_isNull_10_0 = true;
- /* 047 */ long agg_value_10 = -1L;
- /* 048 */ do {
- /* 049 */ if (!agg_localBufIsNull_0) {
- /* 050 */ agg_agg_isNull_10_0 = false;
- /* 051 */ agg_value_10 = agg_localBufValue_0;
- /* 052 */ continue;
- /* 053 */ }
- /* 054 */
- /* 055 */ boolean agg_isNull_12 = false;
- /* 056 */ long agg_value_12 = -1L;
- /* 057 */ if (!false) {
- /* 058 */ agg_value_12 = (long) 0;
- /* 059 */ }
- /* 060 */ if (!agg_isNull_12) {
- /* 061 */ agg_agg_isNull_10_0 = false;
- /* 062 */ agg_value_10 = agg_value_12;
- /* 063 */ continue;
- /* 064 */ }
- /* 065 */
- /* 066 */ } while (false);
- /* 067 */
- /* 068 */ if (!agg_isNull_5) {
- /* 069 */ agg_isNull_9 = false; // resultCode could change nullability.
- /* 070 */
- /* 071 */ agg_value_9 = agg_value_10 + agg_value_5;
- /* 072 */
- /* 073 */ }
- /* 074 */ if (!agg_isNull_9) {
- /* 075 */ agg_agg_isNull_8_0 = false;
- /* 076 */ agg_value_8 = agg_value_9;
- /* 077 */ continue;
- /* 078 */ }
- /* 079 */
- /* 080 */ if (!agg_localBufIsNull_0) {
- /* 081 */ agg_agg_isNull_8_0 = false;
- /* 082 */ agg_value_8 = agg_localBufValue_0;
- /* 083 */ continue;
- /* 084 */ }
- /* 085 */
- /* 086 */ } while (false);
- /* 087 */ // update aggregation buffer
- /* 088 */ agg_bufIsNull_0 = agg_agg_isNull_8_0;
- /* 089 */ agg_bufValue_0 = agg_value_8;
- /* 090 */
- /* 091 */ }
- /* 092 */
- /* 093 */ private void agg_doAggregateWithoutKey_0() throws java.io.IOException {
- /* 094 */ // initialize aggregation buffer
- /* 095 */ agg_bufIsNull_0 = true;
- /* 096 */ agg_bufValue_0 = -1L;
- /* 097 */ agg_bufIsNull_1 = true;
- /* 098 */ agg_bufValue_1 = -1L;
- /* 099 */
- /* 100 */ if (columnartorow_mutableStateArray_1[0] == null) {
- /* 101 */ columnartorow_nextBatch_0();
- /* 102 */ }
- /* 103 */ while ( columnartorow_mutableStateArray_1[0] != null) {
- /* 104 */ int columnartorow_numRows_0 = columnartorow_mutableStateArray_1[0].numRows();
- /* 105 */ int columnartorow_localEnd_0 = columnartorow_numRows_0 - columnartorow_batchIdx_0;
- /* 106 */ for (int columnartorow_localIdx_0 = 0; columnartorow_localIdx_0 < columnartorow_localEnd_0; columnartorow_localIdx_0++) {
- /* 107 */ int columnartorow_rowIdx_0 = columnartorow_batchIdx_0 + columnartorow_localIdx_0;
- /* 108 */ boolean columnartorow_isNull_0 = columnartorow_mutableStateArray_2[0].isNullAt(columnartorow_rowIdx_0);
- /* 109 */ long columnartorow_value_0 = columnartorow_isNull_0 ? -1L : (columnartorow_mutableStateArray_2[0].getLong(columnartorow_rowIdx_0));
- /* 110 */ boolean columnartorow_isNull_1 = columnartorow_mutableStateArray_2[1].isNullAt(columnartorow_rowIdx_0);
- /* 111 */ long columnartorow_value_1 = columnartorow_isNull_1 ? -1L : (columnartorow_mutableStateArray_2[1].getLong(columnartorow_rowIdx_0));
- /* 112 */ boolean columnartorow_isNull_2 = columnartorow_mutableStateArray_2[2].isNullAt(columnartorow_rowIdx_0);
- /* 113 */ long columnartorow_value_2 = columnartorow_isNull_2 ? -1L : (columnartorow_mutableStateArray_2[2].getLong(columnartorow_rowIdx_0));
- /* 114 */
- /* 115 */ agg_doConsume_0(columnartorow_value_0, columnartorow_isNull_0, columnartorow_value_1, columnartorow_isNull_1, columnartorow_value_2, columnartorow_isNull_2);
- /* 116 */ // shouldStop check is eliminated
- /* 117 */ }
- /* 118 */ columnartorow_batchIdx_0 = columnartorow_numRows_0;
- /* 119 */ columnartorow_mutableStateArray_1[0] = null;
- /* 120 */ columnartorow_nextBatch_0();
- /* 121 */ }
- /* 122 */
- /* 123 */ }
- /* 124 */
- /* 125 */ private void agg_doAggregateVal_coalesce_1(long agg_expr_0_0, boolean agg_exprIsNull_0_0, boolean agg_exprIsNull_2_0, long agg_expr_2_0, boolean agg_localBufIsNull_1, boolean agg_exprIsNull_1_0, long agg_value_5, long agg_expr_1_0, long agg_localBufValue_1, boolean agg_isNull_5) throws java.io.IOException {
- /* 126 */ // evaluate aggregate function
- /* 127 */ agg_agg_isNull_15_0 = true;
- /* 128 */ long agg_value_15 = -1L;
- /* 129 */ do {
- /* 130 */ boolean agg_isNull_16 = true;
- /* 131 */ long agg_value_16 = -1L;
- /* 132 */ agg_agg_isNull_17_0 = true;
- /* 133 */ long agg_value_17 = -1L;
- /* 134 */ do {
- /* 135 */ if (!agg_localBufIsNull_1) {
- /* 136 */ agg_agg_isNull_17_0 = false;
- /* 137 */ agg_value_17 = agg_localBufValue_1;
- /* 138 */ continue;
- /* 139 */ }
- /* 140 */
- /* 141 */ boolean agg_isNull_19 = false;
- /* 142 */ long agg_value_19 = -1L;
- /* 143 */ if (!false) {
- /* 144 */ agg_value_19 = (long) 0;
- /* 145 */ }
- /* 146 */ if (!agg_isNull_19) {
- /* 147 */ agg_agg_isNull_17_0 = false;
- /* 148 */ agg_value_17 = agg_value_19;
- /* 149 */ continue;
- /* 150 */ }
- /* 151 */
- /* 152 */ } while (false);
- /* 153 */ boolean agg_isNull_21 = true;
- /* 154 */ long agg_value_21 = -1L;
- /* 155 */
- /* 156 */ if (!agg_isNull_5) {
- /* 157 */ if (!agg_exprIsNull_2_0) {
- /* 158 */ agg_isNull_21 = false; // resultCode could change nullability.
- /* 159 */
- /* 160 */ agg_value_21 = agg_value_5 + agg_expr_2_0;
- /* 161 */
- /* 162 */ }
- /* 163 */
- /* 164 */ }
- /* 165 */ if (!agg_isNull_21) {
- /* 166 */ agg_isNull_16 = false; // resultCode could change nullability.
- /* 167 */
- /* 168 */ agg_value_16 = agg_value_17 + agg_value_21;
- /* 169 */
- /* 170 */ }
- /* 171 */ if (!agg_isNull_16) {
- /* 172 */ agg_agg_isNull_15_0 = false;
- /* 173 */ agg_value_15 = agg_value_16;
- /* 174 */ continue;
- /* 175 */ }
- /* 176 */
- /* 177 */ if (!agg_localBufIsNull_1) {
- /* 178 */ agg_agg_isNull_15_0 = false;
- /* 179 */ agg_value_15 = agg_localBufValue_1;
- /* 180 */ continue;
- /* 181 */ }
- /* 182 */
- /* 183 */ } while (false);
- /* 184 */ // update aggregation buffer
- /* 185 */ agg_bufIsNull_1 = agg_agg_isNull_15_0;
- /* 186 */ agg_bufValue_1 = agg_value_15;
- /* 187 */
- /* 188 */ }
- /* 189 */
- /* 190 */ private void columnartorow_nextBatch_0() throws java.io.IOException {
- /* 191 */ if (columnartorow_mutableStateArray_0[0].hasNext()) {
- /* 192 */ columnartorow_mutableStateArray_1[0] = (org.apache.spark.sql.vectorized.ColumnarBatch)columnartorow_mutableStateArray_0[0].next();
- /* 193 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[1] /* numInputBatches */).add(1);
- /* 194 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(columnartorow_mutableStateArray_1[0].numRows());
- /* 195 */ columnartorow_batchIdx_0 = 0;
- /* 196 */ columnartorow_mutableStateArray_2[0] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) columnartorow_mutableStateArray_1[0].column(0);
- /* 197 */ columnartorow_mutableStateArray_2[1] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) columnartorow_mutableStateArray_1[0].column(1);
- /* 198 */ columnartorow_mutableStateArray_2[2] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) columnartorow_mutableStateArray_1[0].column(2);
- /* 199 */
- /* 200 */ }
- /* 201 */ }
- /* 202 */
- /* 203 */ private void agg_doConsume_0(long agg_expr_0_0, boolean agg_exprIsNull_0_0, long agg_expr_1_0, boolean agg_exprIsNull_1_0, long agg_expr_2_0, boolean agg_exprIsNull_2_0) throws java.io.IOException {
- /* 204 */ // do aggregate
- /* 205 */ // copy aggregation buffer to the local
- /* 206 */ boolean agg_localBufIsNull_0 = agg_bufIsNull_0;
- /* 207 */ long agg_localBufValue_0 = agg_bufValue_0;
- /* 208 */ boolean agg_localBufIsNull_1 = agg_bufIsNull_1;
- /* 209 */ long agg_localBufValue_1 = agg_bufValue_1;
- /* 210 */ // common sub-expressions
- // NOTE: `agg_value_5` is a variable for the common sub expression `a + b`.
- // This variable is detected by `CodeGenerator. getLocalInputVariableValues` and
- // passed into split aggregate expressions: `agg_doAggregateVal_coalesce_0` and
- // `agg_doAggregateVal_coalesce_1` below:
- /* 211 */ boolean agg_isNull_5 = true;
- /* 212 */ long agg_value_5 = -1L;
- /* 213 */
- /* 214 */ if (!agg_exprIsNull_0_0) {
- /* 215 */ if (!agg_exprIsNull_1_0) {
- /* 216 */ agg_isNull_5 = false; // resultCode could change nullability.
- /* 217 */
- /* 218 */ agg_value_5 = agg_expr_0_0 + agg_expr_1_0;
- /* 219 */
- /* 220 */ }
- /* 221 */
- /* 222 */ }
- /* 223 */ // process aggregate functions to update aggregation buffer
- /* 224 */ agg_doAggregateVal_coalesce_0(agg_expr_0_0, agg_localBufIsNull_0, agg_localBufValue_0, agg_exprIsNull_0_0, agg_exprIsNull_1_0, agg_value_5, agg_expr_1_0, agg_isNull_5);
- /* 225 */ agg_doAggregateVal_coalesce_1(agg_expr_0_0, agg_exprIsNull_0_0, agg_exprIsNull_2_0, agg_expr_2_0, agg_localBufIsNull_1, agg_exprIsNull_1_0, agg_value_5, agg_expr_1_0, agg_localBufValue_1, agg_isNull_5);
- /* 226 */
- /* 227 */ }
- /* 228 */
- /* 229 */ protected void processNext() throws java.io.IOException {
- /* 230 */ while (!agg_initAgg_0) {
- /* 231 */ agg_initAgg_0 = true;
- /* 232 */ long agg_beforeAgg_0 = System.nanoTime();
- /* 233 */ agg_doAggregateWithoutKey_0();
- /* 234 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[3] /* aggTime */).add((System.nanoTime() - agg_beforeAgg_0) / 1000000);
- /* 235 */
- /* 236 */ // output the result
- /* 237 */
- /* 238 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[2] /* numOutputRows */).add(1);
- /* 239 */ columnartorow_mutableStateArray_3[2].reset();
- /* 240 */
- /* 241 */ columnartorow_mutableStateArray_3[2].zeroOutNullBytes();
- /* 242 */
- /* 243 */ if (agg_bufIsNull_0) {
- /* 244 */ columnartorow_mutableStateArray_3[2].setNullAt(0);
- /* 245 */ } else {
- /* 246 */ columnartorow_mutableStateArray_3[2].write(0, agg_bufValue_0);
- /* 247 */ }
- /* 248 */
- /* 249 */ if (agg_bufIsNull_1) {
- /* 250 */ columnartorow_mutableStateArray_3[2].setNullAt(1);
- /* 251 */ } else {
- /* 252 */ columnartorow_mutableStateArray_3[2].write(1, agg_bufValue_1);
- /* 253 */ }
- /* 254 */ append((columnartorow_mutableStateArray_3[2].getRow()));
- /* 255 */ }
- /* 256 */ }
- /* 257 */
- /* 258 */ }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement