Advertisement
Guest User

Untitled

a guest
Mar 28th, 2017
77
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.10 KB | None | 0 0
  1. import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
  2. import org.apache.hadoop.hive.ql.metadata.HiveException;
  3. import org.apache.hadoop.hive.ql.parse.SemanticException;
  4. import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
  5. import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
  6. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
  7. import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
  8. import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
  9. import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
  10. import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
  11. import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
  12. import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
  13.  
  14. public class SumInt extends AbstractGenericUDAFResolver {
  15.  
  16. @Override
  17. public GenericUDAFEvaluator getEvaluator(TypeInfo[] info) throws SemanticException {
  18.  
  19. // 파라미터는 하나만 받음
  20. if (info.length != 1) {
  21. throw new UDFArgumentTypeException(info.length - 1, "Exactly one argument is expected.");
  22. }
  23.  
  24. // 파라미터의 카테고리가 프리미티브 타입이 아니면 예외 처리
  25. if (info[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
  26. throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted but " + info[0].getTypeName() + " was passed as parameter 1.");
  27. }
  28.  
  29. // 전달된 파라미터의 타입이 스트링이면 SumStringEvaluator, 아니면 SumIntEvaluator 처리
  30. if (((PrimitiveTypeInfo)info[0]).getPrimitiveCategory() == PrimitiveCategory.STRING) {
  31. return new SumStringEvaluator();
  32. } else if (((PrimitiveTypeInfo)info[0]).getPrimitiveCategory() == PrimitiveCategory.INT) {
  33. return new SumIntEvaluator();
  34. } else {
  35. throw new UDFArgumentTypeException(0, "Only string, int type arguments are accepted but " + info[0].getTypeName() + " was passed as parameter 1.");
  36. }
  37. }
  38.  
  39.  
  40. /**
  41. * 문자열 int 를 변환하여 sum 하는 Evaluator
  42. *
  43. * @author User
  44. *
  45. */
  46. public static class SumStringEvaluator extends GenericUDAFEvaluator {
  47.  
  48. private PrimitiveObjectInspector inputOI;
  49.  
  50. static class SumAggregationBuffer implements AggregationBuffer {
  51. int sum;
  52. }
  53.  
  54. @Override
  55. public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
  56. super.init(m, parameters);
  57.  
  58. inputOI = (PrimitiveObjectInspector) parameters[0];
  59. return PrimitiveObjectInspectorFactory.javaIntObjectInspector;
  60. }
  61.  
  62. @Override
  63. public AggregationBuffer getNewAggregationBuffer() throws HiveException {
  64. SumAggregationBuffer sum = new SumAggregationBuffer();
  65. reset(sum);
  66. return sum;
  67. }
  68.  
  69. @Override
  70. public void reset(AggregationBuffer agg) throws HiveException {
  71. ((SumAggregationBuffer) agg).sum = 0;
  72. }
  73.  
  74. @Override
  75. public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
  76. // 전달받은 파라미터가 없거나, null 일경우 처리
  77. if(parameters.length != 0 && inputOI.getPrimitiveJavaObject(parameters[0]) != null) {
  78. ((SumAggregationBuffer) agg).sum += Integer.parseInt(inputOI.getPrimitiveJavaObject(parameters[0]).toString());
  79. }
  80. }
  81.  
  82. @Override
  83. public Object terminatePartial(AggregationBuffer agg) throws HiveException {
  84. return ((SumAggregationBuffer) agg).sum;
  85. }
  86.  
  87. @Override
  88. public void merge(AggregationBuffer agg, Object partial) throws HiveException {
  89. ((SumAggregationBuffer) agg).sum += Integer.parseInt(inputOI.getPrimitiveJavaObject(partial).toString());
  90. }
  91.  
  92. @Override
  93. public Object terminate(AggregationBuffer agg) throws HiveException {
  94. return ((SumAggregationBuffer) agg).sum;
  95. }
  96.  
  97. }
  98.  
  99. /**
  100. * int 값을 sum 하는 Evaluator
  101. *
  102. * @author User
  103. *
  104. */
  105. public static class SumIntEvaluator extends GenericUDAFEvaluator {
  106.  
  107. private IntObjectInspector inputOI;
  108.  
  109. static class SumAggregationBuffer implements AggregationBuffer {
  110. int sum;
  111. }
  112.  
  113. @Override
  114. public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
  115. super.init(m, parameters);
  116.  
  117. inputOI = (IntObjectInspector) parameters[0];
  118. return PrimitiveObjectInspectorFactory.javaIntObjectInspector;
  119. }
  120.  
  121. @Override
  122. public AggregationBuffer getNewAggregationBuffer() throws HiveException {
  123. SumAggregationBuffer sum = new SumAggregationBuffer();
  124. reset(sum);
  125. return sum;
  126. }
  127.  
  128. @Override
  129. public void reset(AggregationBuffer agg) throws HiveException {
  130. ((SumAggregationBuffer) agg).sum = 0;
  131. }
  132.  
  133. @Override
  134. public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
  135. ((SumAggregationBuffer) agg).sum += inputOI.get(parameters[0]);
  136. }
  137.  
  138. @Override
  139. public Object terminatePartial(AggregationBuffer agg) throws HiveException {
  140. return ((SumAggregationBuffer) agg).sum;
  141. }
  142.  
  143. @Override
  144. public void merge(AggregationBuffer agg, Object partial) throws HiveException {
  145. ((SumAggregationBuffer) agg).sum += inputOI.get(partial);
  146. }
  147.  
  148. @Override
  149. public Object terminate(AggregationBuffer agg) throws HiveException {
  150. return ((SumAggregationBuffer) agg).sum;
  151. }
  152.  
  153. }
  154. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement