Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import scala.reflect.runtime.universe._
- def getObjectInstance(mirror: Mirror, clsName: String): ModuleMirror = {
- val module = mirror.staticModule(clsName)
- mirror.reflectModule(module)
- }
- def reflectSQLFunction(clsName: String)(funcName: String)(params: Any*): Column = {
- val mirror = runtimeMirror(getClass.getClassLoader)
- val instanceMirror: InstanceMirror = mirror.reflect(getObjectInstance(mirror, clsName).instance)
- val paramsType = params.map(getType)
- val methods = instanceMirror.symbol.typeSignature.member(TermName(funcName)).asTerm.alternatives
- val targetMethod = methods.filter{
- method =>
- val methodArgs = method.asInstanceOf[MethodSymbol].paramLists.head.map{
- symbol =>
- symbol.typeSignature.resultType.toString
- }
- if(methodArgs.equals(paramsType)) true else false
- }
- if(targetMethod.size == 1) {
- instanceMirror.reflectMethod(targetMethod.head.asMethod)(params: _*).asInstanceOf[Column]
- } else {
- throw new IllegalStateException(s"Only exist one function for params's type: ${paramsType}, but" +
- s" find ${targetMethod.size} functions.")
- }
- }
- def constructContinueAggregation(dataframe: DataFrame,
- continuousAggregationSpec: ContinuousAggregationSpec): DataFrame = {
- val eventTimeField = continuousAggregationSpec.timeField
- val windowLength = continuousAggregationSpec.window.length
- val windowSlide = continuousAggregationSpec.window.slide
- val waterMarkTime = continuousAggregationSpec.watermark
- val aggregationFunctions = continuousAggregationSpec.aggregationFunctions
- val groupByFields = continuousAggregationSpec.groupByFields
- val filterExpr = continuousAggregationSpec.filterExpr
- val addedFields = continuousAggregationSpec.addedFields
- val selectAndFilterSQL = constructSelectAndFilterSQL(filterExpr, addedFields)
- dataframe.createOrReplaceTempView(AppConstant.DefaultTable)
- val tmpSelectAndFilterDF = dataframe.sparkSession.sql(selectAndFilterSQL)
- val selectAndFilterDF = tmpSelectAndFilterDF.withColumn(eventTimeField,
- tmpSelectAndFilterDF(eventTimeField).cast(TimestampType))
- val aggregatedDF = (groupByFields.isDefined, aggregationFunctions.isDefined) match {
- case (_, false) =>
- throw new IllegalArgumentException("Must has one or more aggregation Functions at least.")
- case (_, true) =>
- val sparkAggFunctions: (String) => (Seq[Any]) => Column = reflectSQLFunction(AppConstant.SqlObjectFunction)(_)
- val aggregationColumns: Seq[Column] = aggregationFunctions.head.map{
- aggregationFunction =>
- val funcName = aggregationFunction.func
- val alias = aggregationFunction.alias
- val fieldColumn = Column(parseExpressionFunc(aggregationFunction.field))
- val resColumn = if(funcName.isDefined) {
- // otherParam is true or double type value
- val otherParam: Seq[AnyVal] = aggregationFunction.otherParam match {
- case Some(param) => param.trim match {
- case "true" => Seq(true)
- case "false" => Seq(false)
- case x => Seq(x.toDouble)
- }
- case None => Seq()
- }
- sparkAggFunctions(funcName.head)(Seq(fieldColumn) ++ otherParam)
- }else {
- fieldColumn
- }
- if(alias.isDefined) {
- resColumn.as(alias.head)
- } else {
- resColumn
- }
- }
- val headColumn = aggregationColumns.head
- val tailColumns = aggregationColumns.tail.toArray
- val groupByDF = if (groupByFields.isDefined) {
- val groupByColumns = groupByFields.head.map{
- field =>
- log.info(s"Group By field is ${field}")
- new Column(field)
- }
- selectAndFilterDF
- .withWatermark(eventTimeField, waterMarkTime)
- .groupBy((List(window(new Column(eventTimeField), windowLength, windowSlide)) ++ groupByColumns): _*)
- }else {
- selectAndFilterDF
- .withWatermark(eventTimeField, waterMarkTime)
- .groupBy(window(new Column(eventTimeField), windowLength, windowSlide))
- }
- groupByDF.agg(headColumn, tailColumns: _*)
- }
- aggregatedDF
- }
- def checkAndGetFinalSchema(structType: StructType, continueAggregationSpec: ContinuousAggregationSpec): Seq[Attribute] = {
- val attributes = structType.fields.map { field =>
- AttributeReference(field.name, field.dataType)()
- }.toSeq
- val logicalPlan = LocalRelation(attributes)
- val encoder: ExpressionEncoder[Row] = RowEncoder(structType)
- val inputDataFrame: Dataset[Row] = Dataset(sparkSession, logicalPlan)(encoder).toDF()
- val aggregation: DataFrame = constructContinueAggregation(inputDataFrame, continueAggregationSpec)
- .withColumn("windowStart", new Column("window.start").cast(LongType))
- .withColumn("windowEnd", new Column("window.end").cast(LongType))
- .drop("window")
- val resolvedPlan = aggregation.logicalPlan
- log.info("final schema is: ")
- log.info(resolvedPlan.output.toString())
- resolvedPlan.output
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement