Advertisement
Guest User

Untitled

a guest
Aug 21st, 2019
133
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 13.79 KB | None | 0 0
  1. scala> sql("SET spark.sql.codegen.aggregate.splitAggregateFunc.enabled=true")
  2. scala> spark.range(10).selectExpr("id a", "id b", "id c").write.saveAsTable("t")
  3. scala> import org.apache.spark.sql.execution.debug._
  4. scala> sql("SELECT SUM(a + b), SUM(a + b + c) FROM t").debugCodegen
  5.  
  6. == Subtree 2 / 2 ==
  7. *(1) HashAggregate(keys=[], functions=[partial_sum((a#21L + b#22L)), partial_sum(((a#21L + b#22L) + c#23L))], output=[sum#63L, sum#64L])
  8. +- *(1) ColumnarToRow
  9. +- FileScan parquet default.t[a#21L,b#22L,c#23L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-master/spark-warehouse/t], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint,c:bigint>
  10.  
  11. Generated code:
  12. /* 001 */ public Object generate(Object[] references) {
  13. /* 002 */ return new GeneratedIteratorForCodegenStage1(references);
  14. /* 003 */ }
  15. /* 004 */
  16. /* 005 */ // codegenStageId=1
  17. /* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
  18. /* 007 */ private Object[] references;
  19. /* 008 */ private scala.collection.Iterator[] inputs;
  20. /* 009 */ private boolean agg_initAgg_0;
  21. /* 010 */ private boolean agg_bufIsNull_0;
  22. /* 011 */ private long agg_bufValue_0;
  23. /* 012 */ private boolean agg_bufIsNull_1;
  24. /* 013 */ private long agg_bufValue_1;
  25. /* 014 */ private int columnartorow_batchIdx_0;
  26. /* 015 */ private boolean agg_agg_isNull_8_0;
  27. /* 016 */ private boolean agg_agg_isNull_10_0;
  28. /* 017 */ private boolean agg_agg_isNull_15_0;
  29. /* 018 */ private boolean agg_agg_isNull_17_0;
  30. /* 019 */ private org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[] columnartorow_mutableStateArray_2 = new org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[3];
  31. /* 020 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] columnartorow_mutableStateArray_3 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[3];
  32. /* 021 */ private org.apache.spark.sql.vectorized.ColumnarBatch[] columnartorow_mutableStateArray_1 = new org.apache.spark.sql.vectorized.ColumnarBatch[1];
  33. /* 022 */ private scala.collection.Iterator[] columnartorow_mutableStateArray_0 = new scala.collection.Iterator[1];
  34. /* 023 */
  35. /* 024 */ public GeneratedIteratorForCodegenStage1(Object[] references) {
  36. /* 025 */ this.references = references;
  37. /* 026 */ }
  38. /* 027 */
  39. /* 028 */ public void init(int index, scala.collection.Iterator[] inputs) {
  40. /* 029 */ partitionIndex = index;
  41. /* 030 */ this.inputs = inputs;
  42. /* 031 */
  43. /* 032 */ columnartorow_mutableStateArray_0[0] = inputs[0];
  44. /* 033 */ columnartorow_mutableStateArray_3[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(3, 0);
  45. /* 034 */ columnartorow_mutableStateArray_3[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(3, 0);
  46. /* 035 */ columnartorow_mutableStateArray_3[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
  47. /* 036 */
  48. /* 037 */ }
  49. /* 038 */
  50. /* 039 */ private void agg_doAggregateVal_coalesce_0(long agg_expr_0_0, boolean agg_localBufIsNull_0, long agg_localBufValue_0, boolean agg_exprIsNull_0_0, boolean agg_exprIsNull_1_0, long agg_value_5, long agg_expr_1_0, boolean agg_isNull_5) throws java.io.IOException {
  51. /* 040 */ // evaluate aggregate function
  52. /* 041 */ agg_agg_isNull_8_0 = true;
  53. /* 042 */ long agg_value_8 = -1L;
  54. /* 043 */ do {
  55. /* 044 */ boolean agg_isNull_9 = true;
  56. /* 045 */ long agg_value_9 = -1L;
  57. /* 046 */ agg_agg_isNull_10_0 = true;
  58. /* 047 */ long agg_value_10 = -1L;
  59. /* 048 */ do {
  60. /* 049 */ if (!agg_localBufIsNull_0) {
  61. /* 050 */ agg_agg_isNull_10_0 = false;
  62. /* 051 */ agg_value_10 = agg_localBufValue_0;
  63. /* 052 */ continue;
  64. /* 053 */ }
  65. /* 054 */
  66. /* 055 */ boolean agg_isNull_12 = false;
  67. /* 056 */ long agg_value_12 = -1L;
  68. /* 057 */ if (!false) {
  69. /* 058 */ agg_value_12 = (long) 0;
  70. /* 059 */ }
  71. /* 060 */ if (!agg_isNull_12) {
  72. /* 061 */ agg_agg_isNull_10_0 = false;
  73. /* 062 */ agg_value_10 = agg_value_12;
  74. /* 063 */ continue;
  75. /* 064 */ }
  76. /* 065 */
  77. /* 066 */ } while (false);
  78. /* 067 */
  79. /* 068 */ if (!agg_isNull_5) {
  80. /* 069 */ agg_isNull_9 = false; // resultCode could change nullability.
  81. /* 070 */
  82. /* 071 */ agg_value_9 = agg_value_10 + agg_value_5;
  83. /* 072 */
  84. /* 073 */ }
  85. /* 074 */ if (!agg_isNull_9) {
  86. /* 075 */ agg_agg_isNull_8_0 = false;
  87. /* 076 */ agg_value_8 = agg_value_9;
  88. /* 077 */ continue;
  89. /* 078 */ }
  90. /* 079 */
  91. /* 080 */ if (!agg_localBufIsNull_0) {
  92. /* 081 */ agg_agg_isNull_8_0 = false;
  93. /* 082 */ agg_value_8 = agg_localBufValue_0;
  94. /* 083 */ continue;
  95. /* 084 */ }
  96. /* 085 */
  97. /* 086 */ } while (false);
  98. /* 087 */ // update aggregation buffer
  99. /* 088 */ agg_bufIsNull_0 = agg_agg_isNull_8_0;
  100. /* 089 */ agg_bufValue_0 = agg_value_8;
  101. /* 090 */
  102. /* 091 */ }
  103. /* 092 */
  104. /* 093 */ private void agg_doAggregateWithoutKey_0() throws java.io.IOException {
  105. /* 094 */ // initialize aggregation buffer
  106. /* 095 */ agg_bufIsNull_0 = true;
  107. /* 096 */ agg_bufValue_0 = -1L;
  108. /* 097 */ agg_bufIsNull_1 = true;
  109. /* 098 */ agg_bufValue_1 = -1L;
  110. /* 099 */
  111. /* 100 */ if (columnartorow_mutableStateArray_1[0] == null) {
  112. /* 101 */ columnartorow_nextBatch_0();
  113. /* 102 */ }
  114. /* 103 */ while ( columnartorow_mutableStateArray_1[0] != null) {
  115. /* 104 */ int columnartorow_numRows_0 = columnartorow_mutableStateArray_1[0].numRows();
  116. /* 105 */ int columnartorow_localEnd_0 = columnartorow_numRows_0 - columnartorow_batchIdx_0;
  117. /* 106 */ for (int columnartorow_localIdx_0 = 0; columnartorow_localIdx_0 < columnartorow_localEnd_0; columnartorow_localIdx_0++) {
  118. /* 107 */ int columnartorow_rowIdx_0 = columnartorow_batchIdx_0 + columnartorow_localIdx_0;
  119. /* 108 */ boolean columnartorow_isNull_0 = columnartorow_mutableStateArray_2[0].isNullAt(columnartorow_rowIdx_0);
  120. /* 109 */ long columnartorow_value_0 = columnartorow_isNull_0 ? -1L : (columnartorow_mutableStateArray_2[0].getLong(columnartorow_rowIdx_0));
  121. /* 110 */ boolean columnartorow_isNull_1 = columnartorow_mutableStateArray_2[1].isNullAt(columnartorow_rowIdx_0);
  122. /* 111 */ long columnartorow_value_1 = columnartorow_isNull_1 ? -1L : (columnartorow_mutableStateArray_2[1].getLong(columnartorow_rowIdx_0));
  123. /* 112 */ boolean columnartorow_isNull_2 = columnartorow_mutableStateArray_2[2].isNullAt(columnartorow_rowIdx_0);
  124. /* 113 */ long columnartorow_value_2 = columnartorow_isNull_2 ? -1L : (columnartorow_mutableStateArray_2[2].getLong(columnartorow_rowIdx_0));
  125. /* 114 */
  126. /* 115 */ agg_doConsume_0(columnartorow_value_0, columnartorow_isNull_0, columnartorow_value_1, columnartorow_isNull_1, columnartorow_value_2, columnartorow_isNull_2);
  127. /* 116 */ // shouldStop check is eliminated
  128. /* 117 */ }
  129. /* 118 */ columnartorow_batchIdx_0 = columnartorow_numRows_0;
  130. /* 119 */ columnartorow_mutableStateArray_1[0] = null;
  131. /* 120 */ columnartorow_nextBatch_0();
  132. /* 121 */ }
  133. /* 122 */
  134. /* 123 */ }
  135. /* 124 */
  136. /* 125 */ private void agg_doAggregateVal_coalesce_1(long agg_expr_0_0, boolean agg_exprIsNull_0_0, boolean agg_exprIsNull_2_0, long agg_expr_2_0, boolean agg_localBufIsNull_1, boolean agg_exprIsNull_1_0, long agg_value_5, long agg_expr_1_0, long agg_localBufValue_1, boolean agg_isNull_5) throws java.io.IOException {
  137. /* 126 */ // evaluate aggregate function
  138. /* 127 */ agg_agg_isNull_15_0 = true;
  139. /* 128 */ long agg_value_15 = -1L;
  140. /* 129 */ do {
  141. /* 130 */ boolean agg_isNull_16 = true;
  142. /* 131 */ long agg_value_16 = -1L;
  143. /* 132 */ agg_agg_isNull_17_0 = true;
  144. /* 133 */ long agg_value_17 = -1L;
  145. /* 134 */ do {
  146. /* 135 */ if (!agg_localBufIsNull_1) {
  147. /* 136 */ agg_agg_isNull_17_0 = false;
  148. /* 137 */ agg_value_17 = agg_localBufValue_1;
  149. /* 138 */ continue;
  150. /* 139 */ }
  151. /* 140 */
  152. /* 141 */ boolean agg_isNull_19 = false;
  153. /* 142 */ long agg_value_19 = -1L;
  154. /* 143 */ if (!false) {
  155. /* 144 */ agg_value_19 = (long) 0;
  156. /* 145 */ }
  157. /* 146 */ if (!agg_isNull_19) {
  158. /* 147 */ agg_agg_isNull_17_0 = false;
  159. /* 148 */ agg_value_17 = agg_value_19;
  160. /* 149 */ continue;
  161. /* 150 */ }
  162. /* 151 */
  163. /* 152 */ } while (false);
  164. /* 153 */ boolean agg_isNull_21 = true;
  165. /* 154 */ long agg_value_21 = -1L;
  166. /* 155 */
  167. /* 156 */ if (!agg_isNull_5) {
  168. /* 157 */ if (!agg_exprIsNull_2_0) {
  169. /* 158 */ agg_isNull_21 = false; // resultCode could change nullability.
  170. /* 159 */
  171. /* 160 */ agg_value_21 = agg_value_5 + agg_expr_2_0;
  172. /* 161 */
  173. /* 162 */ }
  174. /* 163 */
  175. /* 164 */ }
  176. /* 165 */ if (!agg_isNull_21) {
  177. /* 166 */ agg_isNull_16 = false; // resultCode could change nullability.
  178. /* 167 */
  179. /* 168 */ agg_value_16 = agg_value_17 + agg_value_21;
  180. /* 169 */
  181. /* 170 */ }
  182. /* 171 */ if (!agg_isNull_16) {
  183. /* 172 */ agg_agg_isNull_15_0 = false;
  184. /* 173 */ agg_value_15 = agg_value_16;
  185. /* 174 */ continue;
  186. /* 175 */ }
  187. /* 176 */
  188. /* 177 */ if (!agg_localBufIsNull_1) {
  189. /* 178 */ agg_agg_isNull_15_0 = false;
  190. /* 179 */ agg_value_15 = agg_localBufValue_1;
  191. /* 180 */ continue;
  192. /* 181 */ }
  193. /* 182 */
  194. /* 183 */ } while (false);
  195. /* 184 */ // update aggregation buffer
  196. /* 185 */ agg_bufIsNull_1 = agg_agg_isNull_15_0;
  197. /* 186 */ agg_bufValue_1 = agg_value_15;
  198. /* 187 */
  199. /* 188 */ }
  200. /* 189 */
  201. /* 190 */ private void columnartorow_nextBatch_0() throws java.io.IOException {
  202. /* 191 */ if (columnartorow_mutableStateArray_0[0].hasNext()) {
  203. /* 192 */ columnartorow_mutableStateArray_1[0] = (org.apache.spark.sql.vectorized.ColumnarBatch)columnartorow_mutableStateArray_0[0].next();
  204. /* 193 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[1] /* numInputBatches */).add(1);
  205. /* 194 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(columnartorow_mutableStateArray_1[0].numRows());
  206. /* 195 */ columnartorow_batchIdx_0 = 0;
  207. /* 196 */ columnartorow_mutableStateArray_2[0] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) columnartorow_mutableStateArray_1[0].column(0);
  208. /* 197 */ columnartorow_mutableStateArray_2[1] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) columnartorow_mutableStateArray_1[0].column(1);
  209. /* 198 */ columnartorow_mutableStateArray_2[2] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) columnartorow_mutableStateArray_1[0].column(2);
  210. /* 199 */
  211. /* 200 */ }
  212. /* 201 */ }
  213. /* 202 */
  214. /* 203 */ private void agg_doConsume_0(long agg_expr_0_0, boolean agg_exprIsNull_0_0, long agg_expr_1_0, boolean agg_exprIsNull_1_0, long agg_expr_2_0, boolean agg_exprIsNull_2_0) throws java.io.IOException {
  215. /* 204 */ // do aggregate
  216. /* 205 */ // copy aggregation buffer to the local
  217. /* 206 */ boolean agg_localBufIsNull_0 = agg_bufIsNull_0;
  218. /* 207 */ long agg_localBufValue_0 = agg_bufValue_0;
  219. /* 208 */ boolean agg_localBufIsNull_1 = agg_bufIsNull_1;
  220. /* 209 */ long agg_localBufValue_1 = agg_bufValue_1;
  221. /* 210 */ // common sub-expressions
  222. // NOTE: `agg_value_5` is a variable for the common sub expression `a + b`.
  223. // This variable is detected by `CodeGenerator. getLocalInputVariableValues` and
  224. // passed into split aggregate expressions: `agg_doAggregateVal_coalesce_0` and
  225. // `agg_doAggregateVal_coalesce_1` below:
  226. /* 211 */ boolean agg_isNull_5 = true;
  227. /* 212 */ long agg_value_5 = -1L;
  228. /* 213 */
  229. /* 214 */ if (!agg_exprIsNull_0_0) {
  230. /* 215 */ if (!agg_exprIsNull_1_0) {
  231. /* 216 */ agg_isNull_5 = false; // resultCode could change nullability.
  232. /* 217 */
  233. /* 218 */ agg_value_5 = agg_expr_0_0 + agg_expr_1_0;
  234. /* 219 */
  235. /* 220 */ }
  236. /* 221 */
  237. /* 222 */ }
  238. /* 223 */ // process aggregate functions to update aggregation buffer
  239. /* 224 */ agg_doAggregateVal_coalesce_0(agg_expr_0_0, agg_localBufIsNull_0, agg_localBufValue_0, agg_exprIsNull_0_0, agg_exprIsNull_1_0, agg_value_5, agg_expr_1_0, agg_isNull_5);
  240. /* 225 */ agg_doAggregateVal_coalesce_1(agg_expr_0_0, agg_exprIsNull_0_0, agg_exprIsNull_2_0, agg_expr_2_0, agg_localBufIsNull_1, agg_exprIsNull_1_0, agg_value_5, agg_expr_1_0, agg_localBufValue_1, agg_isNull_5);
  241. /* 226 */
  242. /* 227 */ }
  243. /* 228 */
  244. /* 229 */ protected void processNext() throws java.io.IOException {
  245. /* 230 */ while (!agg_initAgg_0) {
  246. /* 231 */ agg_initAgg_0 = true;
  247. /* 232 */ long agg_beforeAgg_0 = System.nanoTime();
  248. /* 233 */ agg_doAggregateWithoutKey_0();
  249. /* 234 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[3] /* aggTime */).add((System.nanoTime() - agg_beforeAgg_0) / 1000000);
  250. /* 235 */
  251. /* 236 */ // output the result
  252. /* 237 */
  253. /* 238 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[2] /* numOutputRows */).add(1);
  254. /* 239 */ columnartorow_mutableStateArray_3[2].reset();
  255. /* 240 */
  256. /* 241 */ columnartorow_mutableStateArray_3[2].zeroOutNullBytes();
  257. /* 242 */
  258. /* 243 */ if (agg_bufIsNull_0) {
  259. /* 244 */ columnartorow_mutableStateArray_3[2].setNullAt(0);
  260. /* 245 */ } else {
  261. /* 246 */ columnartorow_mutableStateArray_3[2].write(0, agg_bufValue_0);
  262. /* 247 */ }
  263. /* 248 */
  264. /* 249 */ if (agg_bufIsNull_1) {
  265. /* 250 */ columnartorow_mutableStateArray_3[2].setNullAt(1);
  266. /* 251 */ } else {
  267. /* 252 */ columnartorow_mutableStateArray_3[2].write(1, agg_bufValue_1);
  268. /* 253 */ }
  269. /* 254 */ append((columnartorow_mutableStateArray_3[2].getRow()));
  270. /* 255 */ }
  271. /* 256 */ }
  272. /* 257 */
  273. /* 258 */ }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement