Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.comarch.iaa.bd.aggregator.coordinator
- import akka.Done
- import akka.actor.{Actor, ActorRef, PoisonPill, Props}
- import akka.pattern.ask
- import akka.util.Timeout
- import com.comarch.iaa.bd.aggregator._
- import com.comarch.iaa.bd.aggregator.calculation.{AggregationConfiguration, AggregationProcessingActor}
- import com.comarch.iaa.bd.aggregator.coordinator.AggregationCoordinator._
- import com.comarch.iaa.bd.aggregator.properties.ApplicationProperties._
- import com.comarch.iaa.bd.aggregator.properties.{ApplicationProperties, KafkaProperties}
- import com.comarch.iaa.bd.cudevent.{CUDEvent, OperationType}
- import com.comarch.iaa.bd.dataingest.stats.ETLStatsDTO
- import com.comarch.iaa.bd.servicescore.rest.RestClient
- import com.comarch.iaa.bd.servicescore.rest.RestClient.ToResponse
- import com.comarch.iaa.metadata.model.warehouse.etlprocess.{AggregationETLProcess, RawETLProcess}
- import com.comarch.iaa.metadata.model.warehouse.fact.Fact
- import com.comarch.oss.bd.kafka.{GenericKafkaConsumer, KafkaProducerFactory, ProtoBufDeserializer}
- import com.comarch.oss.bd.utils.FunctionRetry
- import org.apache.kafka.clients.consumer.ConsumerRecord
- import org.apache.kafka.clients.producer.KafkaProducer
- import org.apache.kafka.common.serialization.Deserializer
- import scala.collection.mutable
- import scala.concurrent.Future
- import scala.util.Try
- object AggregationCoordinator {
- sealed trait CUDEventType
- case object ETLProcessCUD extends CUDEventType
- case object AggregationETLProcessCUD extends CUDEventType
- case object AggregatedFactPartitionCUD extends CUDEventType
- case object DeleteKuduTable
- }
- class AggregationCoordinator extends Actor with AggregationConfiguration with GenericKafkaConsumer[ETLStatsDTO]
- with KafkaProducerFactory with FunctionRetry {
- private val aggregationsPath = "aggregation-processes"
- private val etlProcessesPath = "etl-processes"
- private val relationQueryParam = "relation"
- private val aggregationProcessIdToActorRef: mutable.Map[AggregationProcessId, ActorRef] = mutable.Map[FactId, ActorRef]()
- private val etlProcesses: mutable.Map[FactId, Fact] = mutable.Map[FactId, Fact]()
- private val etlProcessIdToFactId: mutable.Map[ETLProcessId, FactId] = mutable.Map[ETLProcessId, FactId]()
- private val producer: KafkaProducer[String, Array[Byte]] = KafkaProducerFactory.generate()
- private implicit val timeout: Timeout = Timeout.durationToTimeout(aggregatorCalculationTimeout)
- context.actorOf(Props(classOf[ETLChangeMonitor]))
- context.actorOf(Props(classOf[AggregationsChangeMonitor]))
- context.actorOf(Props(classOf[AggregatedFactPartitionChangeMonitor]))
- override def parallelism: Int = aggregatorParallelism
- override def receive: Receive = {
- case (cudEvent: CUDEvent, cudEventType: CUDEventType) =>
- Try {
- cudEventType match {
- case ETLProcessCUD =>
- cudEvent.operationType match {
- case OperationType.CREATE | OperationType.UPDATE =>
- addNewEtlProcesses(getETLProcess(cudEvent.id).toVector)
- case OperationType.DELETE =>
- etlProcessIdToFactId.remove(cudEvent.id).foreach(etlProcesses.remove)
- }
- case AggregationETLProcessCUD =>
- getAggregationProcess(cudEvent.id).foreach(handleAggregationWithCUDEvent(cudEvent, _))
- case AggregatedFactPartitionCUD =>
- getAggregationProcessByFactTableId(cudEvent.id).foreach(handleAggregationWithCUDEvent(cudEvent, _))
- }
- }
- sender ! Done
- case etlStatsDTO: ETLStatsDTO =>
- etlProcessIdToFactId.get(etlStatsDTO.contextId) match {
- case Some(factId) =>
- if (etlStatsDTO.countLoaded == 0 || etlStatsDTO.outputFiles.isEmpty) {
- log.warning(s"Skip calculation because count loaded equals 0 or no output files: $etlStatsDTO")
- } else {
- producer.sendMessage(
- s"${KafkaProperties.aggregationCalculationTopicName}_$factId",
- AggregationDTO(etlStatsDTO.firstRowTime, etlStatsDTO.lastRowTime, factId, etlStatsDTO.outputFiles, Seq.empty, etlStatsDTO.logTime)
- )
- }
- case None => log.warning(s"Cannot find configuration for etlStats = $etlStatsDTO")
- }
- sender() ! Done
- case _ => log.warning("Unexpected message")
- }
- private def handleAggregationWithCUDEvent(cudEvent: CUDEvent, aggregationProcess: AggregationETLProcess): Unit = {
- cudEvent.operationType match {
- case OperationType.CREATE | OperationType.UPDATE =>
- log.info(s"CUDEvent: ${cudEvent.operationType} has been delivered. " +
- s"AggregationProcessingActor for: aggregation process with id: ${aggregationProcess.id} restart has began.")
- stopAggregationProcess(aggregationProcess.id)
- startAggregationProcess(Vector(aggregationProcess))
- case OperationType.DELETE =>
- log.info(s"CUDEvent: ${cudEvent.operationType} has been delivered. " +
- s"AggregationProcessingActor for aggregation process id: ${aggregationProcess.id} will be stopped.")
- aggregationProcessIdToActorRef.get(aggregationProcess.id).foreach(_ ! DeleteKuduTable)
- stopAggregationProcess(aggregationProcess.id)
- }
- }
- private def stopAggregationProcess(aggregationProcessId: AggregationProcessId): Unit = {
- aggregationProcessIdToActorRef.get(aggregationProcessId).foreach { aggregationProcessingActorRef =>
- aggregationProcessingActorRef ! PoisonPill
- log.info(s"PoisonPill has been sent to AggregationProcessingActor with aggregationProcessId: $aggregationProcessId")
- }
- aggregationProcessIdToActorRef -= aggregationProcessId
- }
- override def preStart(): Unit = {
- super.preStart()
- addNewEtlProcesses(getETLProcesses)
- startAggregationProcess(getAggregationProcesses)
- }
- private def addNewEtlProcesses(newEtlProcesses: Vector[RawETLProcess]): Unit = {
- etlProcesses ++= newEtlProcesses.flatMap { etlProcess =>
- etlProcess.fact.foreach(fact => etlProcessIdToFactId += (etlProcess.id -> fact.id))
- etlProcess.fact
- }.map(fact => fact.id -> fact).toMap
- }
- private def startAggregationProcess(aggregationETLProcesses: Vector[AggregationETLProcess]): Unit = {
- aggregationETLProcesses.filter(_.isActive).foreach {
- aggregationETLProcess =>
- aggregationETLProcess.facts.filter(_.isActive).groupBy(_.rawFactId).foreach {
- case (rawFactId, relatedFacts) => rawFactId.foreach(createAggregationProcess(_, aggregationETLProcess.id, relatedFacts))
- }
- }
- }
- private def createAggregationProcess(rawFactId: FactId, aggregationProcessId: AggregationProcessId, relatedFacts: Vector[Fact]): Unit = {
- etlProcesses.get(rawFactId).foreach(fact => {
- val actorName = s"AggregationProcessingActor_${fact.id}_$aggregationProcessId"
- aggregationProcessIdToActorRef += (aggregationProcessId -> context.system.actorOf(Props(classOf[AggregationProcessingActor], fact, relatedFacts.toList), actorName))
- })
- log.info(s"AggregationProcessingActor created for aggregationProcessId: $aggregationProcessId")
- }
- override def topicName: String = KafkaProperties.etlEventTopicName
- override def protoBufDeserializer: Deserializer[ETLStatsDTO] = new ProtoBufDeserializer[ETLStatsDTO.ValueType, ETLStatsDTO.type](ETLStatsDTO)
- override def groupId: String = KafkaProperties.etlEventGroupId
- override def computeMessage(consumerRecord: ConsumerRecord[String, ETLStatsDTO]): Future[Any] = {
- log.info(s"Received ${consumerRecord.value()}")
- self ? consumerRecord.value
- }
- def getAggregationProcesses: Vector[AggregationETLProcess] = {
- retry(ApplicationProperties.numberOfRetryAggregationMetadata)(aggregatesFromRest)
- }
- def getAggregationProcess(aggregationId: AggregationProcessId): Option[AggregationETLProcess] = {
- retryWithArg(ApplicationProperties.numberOfRetryAggregationMetadata)(aggregateFromRest, aggregationId)
- }
- def getAggregationProcessByFactTableId(factTableId: FactTableId): Vector[AggregationETLProcess] = {
- retryWithArg(ApplicationProperties.numberOfRetryAggregationMetadata)(aggregateFromRestByFactTableId, factTableId)
- }
- private def aggregatesFromRest(): Vector[AggregationETLProcess] = {
- RestClient.httpRequestWithToken(name = aggregationsPath, params = Seq(relationQueryParam -> "Fact"))
- .decodeWithException[Vector[AggregationETLProcess]].getOrElse(Vector.empty)
- }
- private def aggregateFromRest(aggregateId: AggregationProcessId): Option[AggregationETLProcess] = {
- RestClient.httpRequestWithToken(aggregationsPath, s"/$aggregateId")
- .decodeWithException[AggregationETLProcess]
- }
- private def aggregateFromRestByFactTableId(factTableId: FactTableId): Vector[AggregationETLProcess] = {
- RestClient.httpRequestWithToken(aggregationsPath, params = Seq(("filter_equal.dm_fact_tables.id", factTableId.toString)))
- .decodeWithException[Vector[AggregationETLProcess]].getOrElse(Vector.empty)
- }
- def getETLProcesses: Vector[RawETLProcess] = {
- retry(ApplicationProperties.numberOfRetryAggregationMetadata)(etlProcessesFromRest)
- }
- def getETLProcess(etlProcessId: ETLProcessId): Option[RawETLProcess] = {
- retryWithArg(ApplicationProperties.numberOfRetryAggregationMetadata)(etlProcessFromRest, etlProcessId)
- }
- private def etlProcessesFromRest(): Vector[RawETLProcess] = {
- RestClient.httpRequestWithToken(name = etlProcessesPath, params = Seq(relationQueryParam -> "FactWithColumns"))
- .decodeWithException[List[RawETLProcess]].getOrElse(Vector.empty).toVector
- }
- private def etlProcessFromRest(etlProcessId: ETLProcessId): Option[RawETLProcess] = {
- RestClient.httpRequestWithToken(etlProcessesPath, s"/$etlProcessId")
- .decodeWithException[RawETLProcess]
- }
- }
- -------------------------------------------
- package com.comarch.iaa.bd.aggregator.calculation
- import akka.actor.{Actor, ActorLogging, ActorRef, Props}
- import akka.pattern.ask
- import akka.util.Timeout
- import com.comarch.iaa.bd.aggregator.FactId
- import com.comarch.iaa.bd.aggregator.coordinator.AggregationCoordinator.DeleteKuduTable
- import com.comarch.iaa.bd.aggregator.coordinator.AggregationProcessCoordinator
- import com.comarch.iaa.bd.aggregator.properties.ApplicationProperties._
- import com.comarch.iaa.bd.aggregator.properties.KuduProperties
- import com.comarch.iaa.metadata.model.warehouse.fact.Fact
- import com.comarch.oss.bd.Env
- import org.apache.kudu.spark.kudu.KuduContext
- import scala.concurrent.ExecutionContext.Implicits.global
- import scala.concurrent.Future
- import scala.util.{Failure, Success}
- class AggregationProcessingActor(sourceFact: Fact, relatedFacts: List[Fact]) extends Actor
- with ActorLogging
- with AggregationFailureStrategy
- with AggregationConfiguration {
- private implicit val timeout: Timeout = Timeout.durationToTimeout(aggregatorCalculationTimeout)
- private val kuduContext: KuduContext = new KuduContext(KuduProperties.kuduMaster, Env.spark.sparkContext)
- private val factIdToAggregationProcessCoordinator: Map[FactId, ActorRef] = relatedFacts.flatMap { fact =>
- convertToAggregationConfig(findParentFact(fact), fact).map { aggregationConfig =>
- val childName = s"AggregationProcessCoordinator_${fact.id}_${System.currentTimeMillis}"
- fact.id -> startCustomBackoffStrategyWithSupervision(Props(classOf[AggregationProcessCoordinator], kuduContext, aggregationConfig), childName, context.self.path.name)
- }
- }.toMap
- override def receive: Receive = {
- case DeleteKuduTable => handleChildrenResponse(sendDeleteRequest)
- case _ => log.warning("Unexpected message!")
- }
- private def handleChildrenResponse(sendFunction: () => Iterable[Future[Any]]): Unit = {
- Future.sequence(sendFunction()).onComplete {
- case Success(_) => log.info("All tables dropped for source fact id: {}", sourceFact.id)
- case Failure(exception) => log.error(exception, "Cannot drop tables for source fact id: {}", sourceFact.id)
- }
- }
- private def sendDeleteRequest(): Iterable[Future[Any]] = {
- factIdToAggregationProcessCoordinator.values.map(_ ? DeleteKuduTable)
- }
- private def findParentFact(fact: Fact): Fact = {
- relatedFacts.find(relatedFact => fact.parentFactId.contains(relatedFact.id)).getOrElse(sourceFact)
- }
- override def preStart(): Unit = {
- log.info(s"AggregationProcessCoordinator actors started for Facts with id: ${factIdToAggregationProcessCoordinator.keys}")
- }
- override def postStop(): Unit = {
- log.info("AggregationProcessingActor stopped for source fact id: {}", sourceFact.id)
- }
- }
- --------------------------------------
- package com.comarch.iaa.bd.aggregator.coordinator
- import akka.Done
- import akka.actor.{Actor, ActorLogging, Props}
- import com.comarch.iaa.bd.aggregator.AggregationDTO
- import com.comarch.iaa.bd.aggregator.calculation._
- import com.comarch.iaa.bd.aggregator.coordinator.AggregationCoordinator.DeleteKuduTable
- import com.comarch.iaa.bd.aggregator.coordinator.AggregationProcessCoordinator.{AggregationDtoBatch, ExecuteAggregation}
- import com.comarch.iaa.bd.aggregator.partitioning.{KuduConfiguration, KuduTableCreator}
- import com.comarch.iaa.bd.dataingest.stats.{ETLStatsDTO, Status}
- import com.comarch.oss.bd.properties.KafkaCoreProperties
- import org.apache.kudu.client.KuduException
- import org.apache.kudu.spark.kudu.KuduContext
- import scala.util.{Failure, Success, Try}
- class AggregationProcessCoordinator(val kuduContext: KuduContext, val aggregationConfig: AggregationConfig) extends Actor
- with ActorLogging
- with AggregationConfiguration
- with KuduConfiguration
- with AggregationLogger
- with KuduTableCreator {
- private val targetTableNameWithSchema = aggregationConfig.targetConfig.factTable.tableNameWithSchema
- private val ON_AGGREGATION_HOLD_MESSAGE = s"Aggregation hold! Aggregation of ${context.self.path.name} for $targetTableNameWithSchema won't be perform!"
- private val ON_AGGREGATION_RESTART_MESSAGE = s"Aggregation restart with Backoff strategy!. Aggregation of ${context.self.path.name} for $targetTableNameWithSchema will be restarted"
- context.actorOf(Props(classOf[AggregationCalculationKafkaConsumer], aggregationConfig))
- private val aggregationCalculationActor = context.actorOf(Props(classOf[AggregationCalculation], kuduContext, aggregationConfig), s"AggregationCalculationActor_$targetTableNameWithSchema")
- private def perform: Receive = {
- case AggregationDtoBatch(data, offset) => executeCalculation(data, offset)
- case DeleteKuduTable => aggregationCalculationActor.tell(DeleteKuduTable, sender())
- }
- override def receive: Receive = {
- case _ => log.warning("Unsupported message")
- }
- def executeCalculation(dataBatch: List[AggregationDTO], offset: Long): Unit = {
- val data = dataBatch.filter(isTimeDefined)
- if (data.nonEmpty) {
- aggregationCalculationActor.tell(ExecuteAggregation(data, offset), sender())
- } else {
- sender() ! Done
- }
- }
- private def isTimeDefined(aggregationDTO: AggregationDTO) = {
- val isTimeNotSet = aggregationDTO.firstRowTime == 0 || aggregationDTO.lastRowTime == 0
- if (isTimeNotSet) sendAggregationLogs(s"Skip calculation because firstRowTime or lastRowTime is equal 0: $aggregationDTO", aggregationConfig, Status.WARNING)
- !isTimeNotSet
- }
- override def preStart(): Unit = {
- Try {
- aggregationConfigToSchema(aggregationConfig)
- initTableCreation()
- } match {
- case Success(_) => context.become(perform)
- case Failure(exception) =>
- log.error(exception, s"AggregationProcessCoordinator start $targetTableNameWithSchema has failed!")
- sendFailureAggregationKafkaMessage(exception)
- context.stop(self)
- }
- }
- override def postStop(): Unit = {
- log.info(s"AggregationProcessCoordinator stopped for $targetTableNameWithSchema")
- }
- def sendFailureAggregationKafkaMessage(exception: Throwable): Unit = {
- val onFailureStrategy: String = exception match {
- case _: KuduException => ON_AGGREGATION_HOLD_MESSAGE + s"Reason: ${exception.getMessage}"
- case _: Exception => ON_AGGREGATION_RESTART_MESSAGE + s"Reason: ${exception.getMessage}"
- }
- val statsDTO = ETLStatsDTO(logTime = System.currentTimeMillis, status = Status.FAILURE, errorMessage = onFailureStrategy, contextId = aggregationConfig.targetConfig.factTable.factId)
- kafkaProducer.sendPartitionedMessage(
- KafkaCoreProperties.aggregationStatsTopic,
- KafkaCoreProperties.aggregationStatsPartitions,
- statsDTO,
- Option(statsDTO.contextId.toString)
- )
- }
- }
- object AggregationProcessCoordinator {
- case class AggregationDtoBatch(data: List[AggregationDTO], offset: Long)
- case class ExecuteAggregation(preprocessedData: List[AggregationDTO], offset: Long)
- }
- --------------------------------
- package com.comarch.iaa.bd.aggregator.calculation
- import java.sql.Timestamp
- import java.time.LocalDateTime
- import akka.Done
- import akka.actor.{Actor, ActorLogging, Props}
- import akka.pattern.ask
- import akka.util.Timeout
- import com.comarch.iaa.bd.aggregator.AggregationDTO
- import com.comarch.iaa.bd.aggregator.calculation.AggregationCalculation.ExtractedResult
- import com.comarch.iaa.bd.aggregator.coordinator.AggregationCoordinator.DeleteKuduTable
- import com.comarch.iaa.bd.aggregator.coordinator.AggregationProcessCoordinator.ExecuteAggregation
- import com.comarch.iaa.bd.aggregator.impala.ImpalaExternalTable
- import com.comarch.iaa.bd.aggregator.partitioning.PartitioningActor.CreatePartitionsFromBatch
- import com.comarch.iaa.bd.aggregator.partitioning.{KuduConfiguration, PartitioningActor}
- import com.comarch.iaa.bd.aggregator.properties.{KafkaProperties, KuduProperties}
- import com.comarch.iaa.bd.dataingest.stats.{ETLStatsDTO, Status}
- import com.comarch.iaa.metadata.model.commontypes.TimePartition
- import com.comarch.iaa.metadata.model.warehouse.fact.FactType
- import com.comarch.oss.bd.Env
- import com.comarch.oss.bd.conf.AppProperties
- import com.comarch.oss.bd.jdbc.KuduConnectionPool
- import com.comarch.oss.bd.kafka.KafkaProducerFactory
- import com.comarch.oss.bd.properties.KafkaCoreProperties
- import org.apache.kafka.clients.producer.RecordMetadata
- import org.apache.kudu.spark.kudu.{KuduContext, _}
- import org.apache.spark.SparkException
- import org.apache.spark.sql.functions._
- import org.apache.spark.sql.{Column, DataFrame, SaveMode}
- import scala.concurrent.{Await, Future}
- import scala.util.{Failure, Success, Try}
- class AggregationCalculation(kuduContext: KuduContext, aggregationConfig: AggregationConfig) extends Actor
- with ActorLogging
- with AggregationConfiguration
- with KafkaProducerFactory
- with AggregationSchemaUpdate
- with KuduConfiguration
- with AggregationLogger {
- private val sourceTableNameWithSchema = aggregationConfig.sourceConfig.factTable.tableNameWithSchema
- private val targetTableNameWithSchema = aggregationConfig.targetConfig.factTable.tableNameWithSchema
- private val kuduSourceOptions: Map[String, String] = kuduOptions(sourceTableNameWithSchema)
- private val kuduTargetOptions: Map[String, String] = kuduOptions(targetTableNameWithSchema)
- private val timestampColumnName = aggregationConfig.sourceConfig.timeColumnName
- private val partitioningActor = context.actorOf(Props(classOf[PartitioningActor], kuduContext, aggregationConfig), s"PartitioningActor_$targetTableNameWithSchema")
- private implicit val timeout: Timeout = Timeout.durationToTimeout(KuduProperties.kuduPartitioningTimeout)
- private val SPARK_BLACKLIST_EXCEPTION_MESSAGE = "Cannot run anywhere due to node and executor blacklist"
- override def receive: Receive = {
- case ExecuteAggregation(data, offset) =>
- Try {
- Await.result(partitioningActor ? CreatePartitionsFromBatch(data), timeout.duration)
- } match {
- case Success(result) if result.equals(Done) => handleAggregationCalculation(data, offset)
- case Success(result) if result.equals(Failure) => log.info(s"Partitioning Actor has ended its work with error. Calculations for $targetTableNameWithSchema skipped.")
- case Failure(exception) => log.error(exception, s"AggregationCalculation: ${context.self.path.name} failed on waiting for response from its child - PartitioningActor. Calculations for $targetTableNameWithSchema skipped.")
- }
- sender() ! Done
- case DeleteKuduTable =>
- Try {
- ImpalaExternalTable.dropTable(targetTableNameWithSchema)(KuduConnectionPool)
- if (kuduContext.tableExists(targetTableNameWithSchema)) {
- kuduContext.deleteTable(targetTableNameWithSchema)
- }
- log.info("Table {} has been dropped.", targetTableNameWithSchema)
- }
- sender() ! Done
- context.stop(self)
- case _ => log.warning("Unexpected message")
- }
- private def handleAggregationCalculation(aggregationDtoBatch: List[AggregationDTO], offset: Long): Unit = {
- aggregationDtoBatch.foreach { element =>
- Try {
- calculateAggregation(element, offset)
- } match {
- case Success(aggregationStats: ETLStatsDTO) =>
- log.info(s"Aggregation $element calculated successfully for target: ${aggregationConfig.targetConfig.factTable.tableNameWithSchema}")
- produceAggregationKafkaMessages(element, aggregationStats, aggregationConfig, offset)
- case Failure(exception) =>
- sendAggregationLogs(s"Unable to calculate aggregation $element for target: ${aggregationConfig.targetConfig.factTable.tableNameWithSchema}", aggregationConfig, Status.FAILURE, Some(exception))
- exception match {
- case ex: SparkException if ex.getMessage.contains(SPARK_BLACKLIST_EXCEPTION_MESSAGE) => throw ex
- case _ =>
- }
- }
- }
- }
- private def calculateAggregation(aggregationDTO: AggregationDTO, offset: Long): ETLStatsDTO = {
- val startTime = System.currentTimeMillis
- val firstRowTime = new Timestamp(aggregationDTO.firstRowTime)
- val lastRowTime = new Timestamp(aggregationDTO.lastRowTime)
- val (extractResult: ExtractedResult, extractDuration: Long) = timedExecution(
- readDataFrame(aggregationDTO, firstRowTime.toLocalDateTime, lastRowTime.toLocalDateTime)
- )
- val (dataFrameToWrite: DataFrame, transformDuration: Long) = timedExecution(
- aggregateDataFrame(offset, extractResult.extracted)
- )
- val (countLoaded: Long, loadDuration: Long) = timedExecution(writeDataFrame(dataFrameToWrite))
- extractResult.extracted.unpersist
- val endTime = System.currentTimeMillis
- ETLStatsDTO(countExtracted = extractResult.extractedRowNum,
- timeExtract = extractDuration,
- timeTransform = transformDuration,
- countLoaded = countLoaded,
- timeLoad = loadDuration,
- timeTotal = endTime - startTime,
- firstRowTime = firstRowTime.getTime,
- lastRowTime = lastRowTime.getTime,
- logTime = System.currentTimeMillis,
- contextId = aggregationConfig.targetConfig.factTable.factId,
- status = Status.SUCCESS)
- }
- private def writeDataFrame(dataFrame: DataFrame): Long = {
- val countLoaded = dataFrame.count
- dataFrame.write
- .options(kuduTargetOptions)
- .mode(SaveMode.Append)
- .kudu
- dataFrame.unpersist()
- countLoaded
- }
- private def aggregateDataFrame(offset: Long, dataFrame: DataFrame) = {
- dataFrame
- .groupBy(aggregationConfig.timeColumn :: aggregationConfig.targetConfig.groupByColumns: _*)
- .agg(countAggregation, aggregationConfig.targetConfig.measureAggregations(aggregationConfig.sourceConfig.factType): _*)
- .withColumn(batchIdFiled.name, batchIdAggregation(offset))
- }
- private def readDataFrame(aggregationDTO: AggregationDTO, firstRowTime: LocalDateTime, lastRowTime: LocalDateTime) = {
- val dataFrame = (aggregationConfig.sourceConfig.factType match {
- case FactType.Aggregated =>
- Env.spark.read.options(kuduSourceOptions).kudu
- .filter(timeFilterCondition(firstRowTime = firstRowTime, lastRowTime = lastRowTime)
- .and(col(batchIdFiled.name).isInCollection(aggregationDTO.batchIds)))
- case FactType.Raw =>
- val dfCaseSensitive = Env.spark.read.format(AppProperties.etlOutputFormat).load(aggregationDTO.outputFiles: _*)
- dfCaseSensitive.toDF(dfCaseSensitive.columns.map(_.toLowerCase): _*)
- }).cache
- ExtractedResult(dataFrame, dataFrame.count)
- }
- private def produceAggregationKafkaMessages(aggregationDTO: AggregationDTO, aggregationStats: ETLStatsDTO, config: AggregationConfig, offset: Long): Future[RecordMetadata] = {
- kafkaProducer.sendMessage(
- s"${KafkaProperties.aggregationCalculationTopicName}_${config.targetConfig.factTable.factId}",
- aggregationDTO.copy(
- firstRowTime = aggregationConfig.sourceConfig.aggregatePeriod.shiftToStart(aggregationDTO.firstRowTime),
- lastRowTime = aggregationConfig.sourceConfig.aggregatePeriod.shiftToEnd(aggregationDTO.lastRowTime),
- factId = config.targetConfig.factTable.factId,
- batchIds = Seq(offset),
- outputFiles = Seq.empty
- )
- )
- kafkaProducer.sendPartitionedMessage(
- KafkaCoreProperties.aggregationStatsTopic,
- KafkaCoreProperties.aggregationStatsPartitions,
- aggregationStats,
- Option(aggregationStats.contextId.toString)
- )
- }
- private def timeFilterCondition(firstRowTime: LocalDateTime, lastRowTime: LocalDateTime): Column = {
- val partitionCondition = aggregationConfig.sourceConfig.timePartitionColumn.map { timePartitionColumn =>
- val sourcePartitionUnit = TimePartition.timePartitionFromColumnNameWithDefault(timePartitionColumn.columnName).unit
- col(timePartitionColumn.columnName) >= lit(firstRowTime.truncatedTo(sourcePartitionUnit).toString) and
- col(timePartitionColumn.columnName) < lit(lastRowTime.truncatedTo(sourcePartitionUnit).toString)
- }
- val conditions = partitionCondition ++ Seq(col(timestampColumnName) >= lit(Timestamp.valueOf(firstRowTime)),
- col(timestampColumnName) < lit(Timestamp.valueOf(lastRowTime)))
- conditions.reduce(_ and _)
- }
- private def timedExecution[R](codeBlock: => R): (R, Long) = {
- val startTime = System.currentTimeMillis()
- val result = codeBlock
- val completeTime = System.currentTimeMillis()
- (result, completeTime - startTime)
- }
- override def postStop(): Unit = {
- log.info(s"AggregationCalculation for target: ${aggregationConfig.targetConfig.factTable.tableNameWithSchema} stopped.")
- }
- }
- object AggregationCalculation {
- case class ExtractedResult(extracted: DataFrame, extractedRowNum: Long)
- case class LoadedResult(extracted: DataFrame, extractedRowNum: Long)
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement