Guest User

Untitled

a guest
Nov 18th, 2017
80
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.22 KB | None | 0 0
  1. package com.meituan.hotel.oe.snappydata.mbl
  2.  
  3. import org.apache.spark.sql.catalyst.dsl.expressions._
  4. import org.apache.spark.sql.catalyst.expressions._
  5. import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
  6. import org.apache.spark.sql.types._
  7.  
  8. case class SumMBL(children: Seq[Expression]) extends DeclarativeAggregate {
  9. private def numBufferPoints: Integer = 1024 * 3
  10.  
  11. override def inputTypes: Seq[DataType] = Seq(ArrayType(IntegerType))
  12.  
  13. private def mblPointType: DataType = new StructType()
  14. .add("ts", IntegerType) // unix timestamp in seconds
  15. .add("meet", LongType)
  16. .add("beat", LongType)
  17. .add("lose", LongType)
  18.  
  19. override def dataType: DataType = ArrayType(mblPointType)
  20.  
  21. override def nullable: Boolean= false
  22.  
  23. private lazy val sumPoints = AttributeReference("sum_points", ArrayType(LongType), nullable = false)()
  24.  
  25. private lazy val numPoints = AttributeReference("num_points", IntegerType, nullable = false)()
  26.  
  27. private lazy val startTimeInSeconds = AttributeReference("start_time_in_seconds", IntegerType, nullable = false)()
  28.  
  29. private lazy val stepInSeconds = AttributeReference("step_in_seconds", IntegerType, nullable = false)()
  30.  
  31. override lazy val aggBufferAttributes: Seq[AttributeReference] = sumPoints :: numPoints :: startTimeInSeconds :: stepInSeconds :: Nil
  32.  
  33. override lazy val initialValues: Seq[Expression] = Seq(
  34. {
  35. val i = UDFUtils.makeIter("sum_mbl_initalValues")
  36. GenerateArray(Literal(numBufferPoints), i, Literal(0, LongType))
  37. },
  38. Literal(0),
  39. Literal(0),
  40. Literal(0)
  41. )
  42.  
  43. override lazy val updateExpressions: Seq[Expression] = {
  44. val i = UDFUtils.makeIter("sum_mbl_updateExpressions")
  45. val arraySize = GetArraySize(children.head)
  46. Seq(
  47. DoSeq(
  48. ForStep(numBufferPoints, step = 3, i, {
  49. val meetCount = GetArrayItem(children.head, i)
  50. val beatCount = GetArrayItem(children.head, i + 1)
  51. val loseCount = GetArrayItem(children.head, i + 2)
  52.  
  53. val meetSum = GetArrayItem(sumPoints, i)
  54. val beatSum = GetArrayItem(sumPoints, i + 1)
  55. val loseSum = GetArrayItem(sumPoints, i + 2)
  56.  
  57. If (meetCount >= 0,
  58. Then(
  59. SetArrayItem(sumPoints, i, meetSum + meetCount),
  60. SetArrayItem(sumPoints, i + 1, beatSum + beatCount),
  61. SetArrayItem(sumPoints, i + 2, loseSum + loseCount),
  62. sumPoints),
  63. Else(
  64. sumPoints
  65. ))
  66. }),
  67. sumPoints),
  68. arraySize - 2, // numPoints = 数组长度-2
  69. GetArrayItem(children.head, arraySize - 2), // startTimeInSeconds = 数组倒数第2个元素
  70. GetArrayItem(children.head, arraySize - 1) // stepInSeconds = 数组倒数第1个元素
  71. )
  72. }
  73.  
  74. override lazy val mergeExpressions: Seq[Expression] = Seq(
  75. {
  76. val i = UDFUtils.makeIter("sum_mbl_mergeExpressions")
  77. DoSeq(
  78. ForStep(numBufferPoints, 3, i, {
  79. val leftMeet = GetArrayItem(sumPoints.left, i + 0)
  80. val leftBeat = GetArrayItem(sumPoints.left, i + 1)
  81. val leftLose = GetArrayItem(sumPoints.left, i + 2)
  82.  
  83. val rightMeet = GetArrayItem(sumPoints.right, i + 0)
  84. val rightBeat = GetArrayItem(sumPoints.right, i + 1)
  85. val rightLose = GetArrayItem(sumPoints.right, i + 2)
  86.  
  87. DoSeq(
  88. SetArrayItem(sumPoints, i, leftMeet + rightMeet),
  89. SetArrayItem(sumPoints, i + 1, leftBeat + rightBeat),
  90. SetArrayItem(sumPoints, i + 2, leftLose + rightLose)
  91. )
  92. }),
  93. sumPoints)
  94. },
  95.  
  96. If(numPoints.left === 0, numPoints.right, numPoints.left), // num_points
  97. If(startTimeInSeconds.left === 0, startTimeInSeconds.right, startTimeInSeconds.left), // start_time_in_seconds
  98. If(stepInSeconds.left === 0, stepInSeconds.right, stepInSeconds.left) // step_in_seconds
  99. )
  100.  
  101. override lazy val evaluateExpression: Expression = {
  102. val i = UDFUtils.makeIter("sum_mbl_evaluateExpression")
  103.  
  104. val meet = GetArrayItem(sumPoints, i * 3 + 0)
  105. val beat = GetArrayItem(sumPoints, i * 3 + 1)
  106. val lose = GetArrayItem(sumPoints, i * 3 + 2)
  107.  
  108. GenerateArray(numPoints / 3, i,
  109. CreateLocalStruct(Seq(
  110. startTimeInSeconds + (i+1) * stepInSeconds,
  111. meet,
  112. beat,
  113. lose)
  114. )
  115. )
  116. }
  117. }
Add Comment
Please, Sign In to add comment