Advertisement
Guest User

Untitled

a guest
Sep 13th, 2019
162
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 26.53 KB | None | 0 0
  1. package com.comarch.iaa.bd.aggregator.coordinator
  2.  
  3. import akka.Done
  4. import akka.actor.{Actor, ActorRef, PoisonPill, Props}
  5. import akka.pattern.ask
  6. import akka.util.Timeout
  7. import com.comarch.iaa.bd.aggregator._
  8. import com.comarch.iaa.bd.aggregator.calculation.{AggregationConfiguration, AggregationProcessingActor}
  9. import com.comarch.iaa.bd.aggregator.coordinator.AggregationCoordinator._
  10. import com.comarch.iaa.bd.aggregator.properties.ApplicationProperties._
  11. import com.comarch.iaa.bd.aggregator.properties.{ApplicationProperties, KafkaProperties}
  12. import com.comarch.iaa.bd.cudevent.{CUDEvent, OperationType}
  13. import com.comarch.iaa.bd.dataingest.stats.ETLStatsDTO
  14. import com.comarch.iaa.bd.servicescore.rest.RestClient
  15. import com.comarch.iaa.bd.servicescore.rest.RestClient.ToResponse
  16. import com.comarch.iaa.metadata.model.warehouse.etlprocess.{AggregationETLProcess, RawETLProcess}
  17. import com.comarch.iaa.metadata.model.warehouse.fact.Fact
  18. import com.comarch.oss.bd.kafka.{GenericKafkaConsumer, KafkaProducerFactory, ProtoBufDeserializer}
  19. import com.comarch.oss.bd.utils.FunctionRetry
  20. import org.apache.kafka.clients.consumer.ConsumerRecord
  21. import org.apache.kafka.clients.producer.KafkaProducer
  22. import org.apache.kafka.common.serialization.Deserializer
  23.  
  24. import scala.collection.mutable
  25. import scala.concurrent.Future
  26. import scala.util.Try
  27.  
  28. object AggregationCoordinator {
  29.  
  30.   sealed trait CUDEventType
  31.  
  32.   case object ETLProcessCUD extends CUDEventType
  33.  
  34.   case object AggregationETLProcessCUD extends CUDEventType
  35.  
  36.   case object AggregatedFactPartitionCUD extends CUDEventType
  37.  
  38.   case object DeleteKuduTable
  39.  
  40. }
  41.  
  42. class AggregationCoordinator extends Actor with AggregationConfiguration with GenericKafkaConsumer[ETLStatsDTO]
  43.   with KafkaProducerFactory with FunctionRetry {
  44.   private val aggregationsPath = "aggregation-processes"
  45.   private val etlProcessesPath = "etl-processes"
  46.   private val relationQueryParam = "relation"
  47.   private val aggregationProcessIdToActorRef: mutable.Map[AggregationProcessId, ActorRef] = mutable.Map[FactId, ActorRef]()
  48.   private val etlProcesses: mutable.Map[FactId, Fact] = mutable.Map[FactId, Fact]()
  49.   private val etlProcessIdToFactId: mutable.Map[ETLProcessId, FactId] = mutable.Map[ETLProcessId, FactId]()
  50.   private val producer: KafkaProducer[String, Array[Byte]] = KafkaProducerFactory.generate()
  51.   private implicit val timeout: Timeout = Timeout.durationToTimeout(aggregatorCalculationTimeout)
  52.  
  53.   context.actorOf(Props(classOf[ETLChangeMonitor]))
  54.   context.actorOf(Props(classOf[AggregationsChangeMonitor]))
  55.   context.actorOf(Props(classOf[AggregatedFactPartitionChangeMonitor]))
  56.  
  57.   override def parallelism: Int = aggregatorParallelism
  58.  
  59.   override def receive: Receive = {
  60.     case (cudEvent: CUDEvent, cudEventType: CUDEventType) =>
  61.       Try {
  62.         cudEventType match {
  63.           case ETLProcessCUD =>
  64.             cudEvent.operationType match {
  65.               case OperationType.CREATE | OperationType.UPDATE =>
  66.                 addNewEtlProcesses(getETLProcess(cudEvent.id).toVector)
  67.               case OperationType.DELETE =>
  68.                 etlProcessIdToFactId.remove(cudEvent.id).foreach(etlProcesses.remove)
  69.             }
  70.           case AggregationETLProcessCUD =>
  71.             getAggregationProcess(cudEvent.id).foreach(handleAggregationWithCUDEvent(cudEvent, _))
  72.           case AggregatedFactPartitionCUD =>
  73.             getAggregationProcessByFactTableId(cudEvent.id).foreach(handleAggregationWithCUDEvent(cudEvent, _))
  74.         }
  75.       }
  76.       sender ! Done
  77.  
  78.     case etlStatsDTO: ETLStatsDTO =>
  79.       etlProcessIdToFactId.get(etlStatsDTO.contextId) match {
  80.         case Some(factId) =>
  81.           if (etlStatsDTO.countLoaded == 0 || etlStatsDTO.outputFiles.isEmpty) {
  82.             log.warning(s"Skip calculation because count loaded equals 0 or no output files: $etlStatsDTO")
  83.           } else {
  84.             producer.sendMessage(
  85.               s"${KafkaProperties.aggregationCalculationTopicName}_$factId",
  86.               AggregationDTO(etlStatsDTO.firstRowTime, etlStatsDTO.lastRowTime, factId, etlStatsDTO.outputFiles, Seq.empty, etlStatsDTO.logTime)
  87.             )
  88.           }
  89.         case None => log.warning(s"Cannot find configuration for etlStats = $etlStatsDTO")
  90.       }
  91.       sender() ! Done
  92.  
  93.     case _ => log.warning("Unexpected message")
  94.   }
  95.  
  96.   private def handleAggregationWithCUDEvent(cudEvent: CUDEvent, aggregationProcess: AggregationETLProcess): Unit = {
  97.     cudEvent.operationType match {
  98.       case OperationType.CREATE | OperationType.UPDATE =>
  99.         log.info(s"CUDEvent: ${cudEvent.operationType} has been delivered. " +
  100.           s"AggregationProcessingActor for: aggregation process with id: ${aggregationProcess.id} restart has began.")
  101.         stopAggregationProcess(aggregationProcess.id)
  102.         startAggregationProcess(Vector(aggregationProcess))
  103.       case OperationType.DELETE =>
  104.         log.info(s"CUDEvent: ${cudEvent.operationType} has been delivered. " +
  105.           s"AggregationProcessingActor for aggregation process id: ${aggregationProcess.id} will be stopped.")
  106.         aggregationProcessIdToActorRef.get(aggregationProcess.id).foreach(_ ! DeleteKuduTable)
  107.         stopAggregationProcess(aggregationProcess.id)
  108.     }
  109.   }
  110.  
  111.   private def stopAggregationProcess(aggregationProcessId: AggregationProcessId): Unit = {
  112.     aggregationProcessIdToActorRef.get(aggregationProcessId).foreach { aggregationProcessingActorRef =>
  113.       aggregationProcessingActorRef ! PoisonPill
  114.       log.info(s"PoisonPill has been sent to AggregationProcessingActor with aggregationProcessId: $aggregationProcessId")
  115.     }
  116.     aggregationProcessIdToActorRef -= aggregationProcessId
  117.   }
  118.  
  119.   override def preStart(): Unit = {
  120.     super.preStart()
  121.     addNewEtlProcesses(getETLProcesses)
  122.     startAggregationProcess(getAggregationProcesses)
  123.   }
  124.  
  125.   private def addNewEtlProcesses(newEtlProcesses: Vector[RawETLProcess]): Unit = {
  126.     etlProcesses ++= newEtlProcesses.flatMap { etlProcess =>
  127.       etlProcess.fact.foreach(fact => etlProcessIdToFactId += (etlProcess.id -> fact.id))
  128.       etlProcess.fact
  129.     }.map(fact => fact.id -> fact).toMap
  130.   }
  131.  
  132.   private def startAggregationProcess(aggregationETLProcesses: Vector[AggregationETLProcess]): Unit = {
  133.     aggregationETLProcesses.filter(_.isActive).foreach {
  134.       aggregationETLProcess =>
  135.         aggregationETLProcess.facts.filter(_.isActive).groupBy(_.rawFactId).foreach {
  136.           case (rawFactId, relatedFacts) => rawFactId.foreach(createAggregationProcess(_, aggregationETLProcess.id, relatedFacts))
  137.         }
  138.     }
  139.   }
  140.  
  141.   private def createAggregationProcess(rawFactId: FactId, aggregationProcessId: AggregationProcessId, relatedFacts: Vector[Fact]): Unit = {
  142.     etlProcesses.get(rawFactId).foreach(fact => {
  143.       val actorName = s"AggregationProcessingActor_${fact.id}_$aggregationProcessId"
  144.       aggregationProcessIdToActorRef += (aggregationProcessId -> context.system.actorOf(Props(classOf[AggregationProcessingActor], fact, relatedFacts.toList), actorName))
  145.     })
  146.     log.info(s"AggregationProcessingActor created for aggregationProcessId: $aggregationProcessId")
  147.   }
  148.  
  149.   override def topicName: String = KafkaProperties.etlEventTopicName
  150.  
  151.   override def protoBufDeserializer: Deserializer[ETLStatsDTO] = new ProtoBufDeserializer[ETLStatsDTO.ValueType, ETLStatsDTO.type](ETLStatsDTO)
  152.  
  153.   override def groupId: String = KafkaProperties.etlEventGroupId
  154.  
  155.   override def computeMessage(consumerRecord: ConsumerRecord[String, ETLStatsDTO]): Future[Any] = {
  156.     log.info(s"Received ${consumerRecord.value()}")
  157.     self ? consumerRecord.value
  158.   }
  159.  
  160.   def getAggregationProcesses: Vector[AggregationETLProcess] = {
  161.     retry(ApplicationProperties.numberOfRetryAggregationMetadata)(aggregatesFromRest)
  162.   }
  163.  
  164.   def getAggregationProcess(aggregationId: AggregationProcessId): Option[AggregationETLProcess] = {
  165.     retryWithArg(ApplicationProperties.numberOfRetryAggregationMetadata)(aggregateFromRest, aggregationId)
  166.   }
  167.  
  168.  
  169.   def getAggregationProcessByFactTableId(factTableId: FactTableId): Vector[AggregationETLProcess] = {
  170.     retryWithArg(ApplicationProperties.numberOfRetryAggregationMetadata)(aggregateFromRestByFactTableId, factTableId)
  171.   }
  172.  
  173.  
  174.   private def aggregatesFromRest(): Vector[AggregationETLProcess] = {
  175.     RestClient.httpRequestWithToken(name = aggregationsPath, params = Seq(relationQueryParam -> "Fact"))
  176.       .decodeWithException[Vector[AggregationETLProcess]].getOrElse(Vector.empty)
  177.   }
  178.  
  179.   private def aggregateFromRest(aggregateId: AggregationProcessId): Option[AggregationETLProcess] = {
  180.     RestClient.httpRequestWithToken(aggregationsPath, s"/$aggregateId")
  181.       .decodeWithException[AggregationETLProcess]
  182.   }
  183.  
  184.   private def aggregateFromRestByFactTableId(factTableId: FactTableId): Vector[AggregationETLProcess] = {
  185.     RestClient.httpRequestWithToken(aggregationsPath, params = Seq(("filter_equal.dm_fact_tables.id", factTableId.toString)))
  186.       .decodeWithException[Vector[AggregationETLProcess]].getOrElse(Vector.empty)
  187.   }
  188.  
  189.   def getETLProcesses: Vector[RawETLProcess] = {
  190.     retry(ApplicationProperties.numberOfRetryAggregationMetadata)(etlProcessesFromRest)
  191.   }
  192.  
  193.   def getETLProcess(etlProcessId: ETLProcessId): Option[RawETLProcess] = {
  194.     retryWithArg(ApplicationProperties.numberOfRetryAggregationMetadata)(etlProcessFromRest, etlProcessId)
  195.   }
  196.  
  197.   private def etlProcessesFromRest(): Vector[RawETLProcess] = {
  198.     RestClient.httpRequestWithToken(name = etlProcessesPath, params = Seq(relationQueryParam -> "FactWithColumns"))
  199.       .decodeWithException[List[RawETLProcess]].getOrElse(Vector.empty).toVector
  200.   }
  201.  
  202.   private def etlProcessFromRest(etlProcessId: ETLProcessId): Option[RawETLProcess] = {
  203.     RestClient.httpRequestWithToken(etlProcessesPath, s"/$etlProcessId")
  204.       .decodeWithException[RawETLProcess]
  205.   }
  206. }
  207.  
  208. -------------------------------------------
  209.  
  210. package com.comarch.iaa.bd.aggregator.calculation
  211.  
  212. import akka.actor.{Actor, ActorLogging, ActorRef, Props}
  213. import akka.pattern.ask
  214. import akka.util.Timeout
  215. import com.comarch.iaa.bd.aggregator.FactId
  216. import com.comarch.iaa.bd.aggregator.coordinator.AggregationCoordinator.DeleteKuduTable
  217. import com.comarch.iaa.bd.aggregator.coordinator.AggregationProcessCoordinator
  218. import com.comarch.iaa.bd.aggregator.properties.ApplicationProperties._
  219. import com.comarch.iaa.bd.aggregator.properties.KuduProperties
  220. import com.comarch.iaa.metadata.model.warehouse.fact.Fact
  221. import com.comarch.oss.bd.Env
  222. import org.apache.kudu.spark.kudu.KuduContext
  223.  
  224. import scala.concurrent.ExecutionContext.Implicits.global
  225. import scala.concurrent.Future
  226. import scala.util.{Failure, Success}
  227.  
  228. class AggregationProcessingActor(sourceFact: Fact, relatedFacts: List[Fact]) extends Actor
  229.   with ActorLogging
  230.   with AggregationFailureStrategy
  231.   with AggregationConfiguration {
  232.  
  233.   private implicit val timeout: Timeout = Timeout.durationToTimeout(aggregatorCalculationTimeout)
  234.   private val kuduContext: KuduContext = new KuduContext(KuduProperties.kuduMaster, Env.spark.sparkContext)
  235.  
  236.   private val factIdToAggregationProcessCoordinator: Map[FactId, ActorRef] = relatedFacts.flatMap { fact =>
  237.     convertToAggregationConfig(findParentFact(fact), fact).map { aggregationConfig =>
  238.       val childName = s"AggregationProcessCoordinator_${fact.id}_${System.currentTimeMillis}"
  239.       fact.id -> startCustomBackoffStrategyWithSupervision(Props(classOf[AggregationProcessCoordinator], kuduContext, aggregationConfig), childName, context.self.path.name)
  240.     }
  241.   }.toMap
  242.  
  243.   override def receive: Receive = {
  244.     case DeleteKuduTable => handleChildrenResponse(sendDeleteRequest)
  245.     case _ => log.warning("Unexpected message!")
  246.   }
  247.  
  248.   private def handleChildrenResponse(sendFunction: () => Iterable[Future[Any]]): Unit = {
  249.     Future.sequence(sendFunction()).onComplete {
  250.       case Success(_) => log.info("All tables dropped for source fact id: {}", sourceFact.id)
  251.       case Failure(exception) => log.error(exception, "Cannot drop tables for source fact id: {}", sourceFact.id)
  252.     }
  253.   }
  254.  
  255.   private def sendDeleteRequest(): Iterable[Future[Any]] = {
  256.     factIdToAggregationProcessCoordinator.values.map(_ ? DeleteKuduTable)
  257.   }
  258.  
  259.   private def findParentFact(fact: Fact): Fact = {
  260.     relatedFacts.find(relatedFact => fact.parentFactId.contains(relatedFact.id)).getOrElse(sourceFact)
  261.   }
  262.  
  263.   override def preStart(): Unit = {
  264.     log.info(s"AggregationProcessCoordinator actors started for Facts with id: ${factIdToAggregationProcessCoordinator.keys}")
  265.   }
  266.  
  267.   override def postStop(): Unit = {
  268.     log.info("AggregationProcessingActor stopped for source fact id: {}", sourceFact.id)
  269.   }
  270. }
  271.  
  272. --------------------------------------
  273.  
  274. package com.comarch.iaa.bd.aggregator.coordinator
  275.  
  276. import akka.Done
  277. import akka.actor.{Actor, ActorLogging, Props}
  278. import com.comarch.iaa.bd.aggregator.AggregationDTO
  279. import com.comarch.iaa.bd.aggregator.calculation._
  280. import com.comarch.iaa.bd.aggregator.coordinator.AggregationCoordinator.DeleteKuduTable
  281. import com.comarch.iaa.bd.aggregator.coordinator.AggregationProcessCoordinator.{AggregationDtoBatch, ExecuteAggregation}
  282. import com.comarch.iaa.bd.aggregator.partitioning.{KuduConfiguration, KuduTableCreator}
  283. import com.comarch.iaa.bd.dataingest.stats.{ETLStatsDTO, Status}
  284. import com.comarch.oss.bd.properties.KafkaCoreProperties
  285. import org.apache.kudu.client.KuduException
  286. import org.apache.kudu.spark.kudu.KuduContext
  287.  
  288. import scala.util.{Failure, Success, Try}
  289.  
  290. class AggregationProcessCoordinator(val kuduContext: KuduContext, val aggregationConfig: AggregationConfig) extends Actor
  291.   with ActorLogging
  292.   with AggregationConfiguration
  293.   with KuduConfiguration
  294.   with AggregationLogger
  295.   with KuduTableCreator {
  296.  
  297.   private val targetTableNameWithSchema = aggregationConfig.targetConfig.factTable.tableNameWithSchema
  298.   private val ON_AGGREGATION_HOLD_MESSAGE = s"Aggregation hold! Aggregation of ${context.self.path.name} for $targetTableNameWithSchema won't be perform!"
  299.   private val ON_AGGREGATION_RESTART_MESSAGE = s"Aggregation restart with Backoff strategy!. Aggregation of ${context.self.path.name} for $targetTableNameWithSchema will be restarted"
  300.  
  301.   context.actorOf(Props(classOf[AggregationCalculationKafkaConsumer], aggregationConfig))
  302.   private val aggregationCalculationActor = context.actorOf(Props(classOf[AggregationCalculation], kuduContext, aggregationConfig), s"AggregationCalculationActor_$targetTableNameWithSchema")
  303.  
  304.   private def perform: Receive = {
  305.     case AggregationDtoBatch(data, offset) => executeCalculation(data, offset)
  306.     case DeleteKuduTable => aggregationCalculationActor.tell(DeleteKuduTable, sender())
  307.   }
  308.  
  309.   override def receive: Receive = {
  310.     case _ => log.warning("Unsupported message")
  311.   }
  312.  
  313.   def executeCalculation(dataBatch: List[AggregationDTO], offset: Long): Unit = {
  314.     val data = dataBatch.filter(isTimeDefined)
  315.  
  316.     if (data.nonEmpty) {
  317.       aggregationCalculationActor.tell(ExecuteAggregation(data, offset), sender())
  318.     } else {
  319.       sender() ! Done
  320.     }
  321.   }
  322.  
  323.   private def isTimeDefined(aggregationDTO: AggregationDTO) = {
  324.     val isTimeNotSet = aggregationDTO.firstRowTime == 0 || aggregationDTO.lastRowTime == 0
  325.  
  326.     if (isTimeNotSet) sendAggregationLogs(s"Skip calculation because firstRowTime or lastRowTime is equal 0: $aggregationDTO", aggregationConfig, Status.WARNING)
  327.     !isTimeNotSet
  328.   }
  329.  
  330.   override def preStart(): Unit = {
  331.     Try {
  332.       aggregationConfigToSchema(aggregationConfig)
  333.       initTableCreation()
  334.     } match {
  335.       case Success(_) => context.become(perform)
  336.       case Failure(exception) =>
  337.         log.error(exception, s"AggregationProcessCoordinator start $targetTableNameWithSchema has failed!")
  338.         sendFailureAggregationKafkaMessage(exception)
  339.         context.stop(self)
  340.     }
  341.   }
  342.  
  343.   override def postStop(): Unit = {
  344.     log.info(s"AggregationProcessCoordinator stopped for $targetTableNameWithSchema")
  345.   }
  346.  
  347.   def sendFailureAggregationKafkaMessage(exception: Throwable): Unit = {
  348.     val onFailureStrategy: String = exception match {
  349.       case _: KuduException => ON_AGGREGATION_HOLD_MESSAGE + s"Reason: ${exception.getMessage}"
  350.       case _: Exception => ON_AGGREGATION_RESTART_MESSAGE + s"Reason: ${exception.getMessage}"
  351.     }
  352.  
  353.     val statsDTO = ETLStatsDTO(logTime = System.currentTimeMillis, status = Status.FAILURE, errorMessage = onFailureStrategy, contextId = aggregationConfig.targetConfig.factTable.factId)
  354.     kafkaProducer.sendPartitionedMessage(
  355.       KafkaCoreProperties.aggregationStatsTopic,
  356.       KafkaCoreProperties.aggregationStatsPartitions,
  357.       statsDTO,
  358.       Option(statsDTO.contextId.toString)
  359.     )
  360.  
  361.   }
  362. }
  363.  
  364. object AggregationProcessCoordinator {
  365.  
  366.   case class AggregationDtoBatch(data: List[AggregationDTO], offset: Long)
  367.  
  368.   case class ExecuteAggregation(preprocessedData: List[AggregationDTO], offset: Long)
  369.  
  370. }
  371.  
  372. --------------------------------
  373. package com.comarch.iaa.bd.aggregator.calculation
  374.  
  375. import java.sql.Timestamp
  376. import java.time.LocalDateTime
  377.  
  378. import akka.Done
  379. import akka.actor.{Actor, ActorLogging, Props}
  380. import akka.pattern.ask
  381. import akka.util.Timeout
  382. import com.comarch.iaa.bd.aggregator.AggregationDTO
  383. import com.comarch.iaa.bd.aggregator.calculation.AggregationCalculation.ExtractedResult
  384. import com.comarch.iaa.bd.aggregator.coordinator.AggregationCoordinator.DeleteKuduTable
  385. import com.comarch.iaa.bd.aggregator.coordinator.AggregationProcessCoordinator.ExecuteAggregation
  386. import com.comarch.iaa.bd.aggregator.impala.ImpalaExternalTable
  387. import com.comarch.iaa.bd.aggregator.partitioning.PartitioningActor.CreatePartitionsFromBatch
  388. import com.comarch.iaa.bd.aggregator.partitioning.{KuduConfiguration, PartitioningActor}
  389. import com.comarch.iaa.bd.aggregator.properties.{KafkaProperties, KuduProperties}
  390. import com.comarch.iaa.bd.dataingest.stats.{ETLStatsDTO, Status}
  391. import com.comarch.iaa.metadata.model.commontypes.TimePartition
  392. import com.comarch.iaa.metadata.model.warehouse.fact.FactType
  393. import com.comarch.oss.bd.Env
  394. import com.comarch.oss.bd.conf.AppProperties
  395. import com.comarch.oss.bd.jdbc.KuduConnectionPool
  396. import com.comarch.oss.bd.kafka.KafkaProducerFactory
  397. import com.comarch.oss.bd.properties.KafkaCoreProperties
  398. import org.apache.kafka.clients.producer.RecordMetadata
  399. import org.apache.kudu.spark.kudu.{KuduContext, _}
  400. import org.apache.spark.SparkException
  401. import org.apache.spark.sql.functions._
  402. import org.apache.spark.sql.{Column, DataFrame, SaveMode}
  403.  
  404. import scala.concurrent.{Await, Future}
  405. import scala.util.{Failure, Success, Try}
  406.  
  407. class AggregationCalculation(kuduContext: KuduContext, aggregationConfig: AggregationConfig) extends Actor
  408.   with ActorLogging
  409.   with AggregationConfiguration
  410.   with KafkaProducerFactory
  411.   with AggregationSchemaUpdate
  412.   with KuduConfiguration
  413.   with AggregationLogger {
  414.  
  415.   private val sourceTableNameWithSchema = aggregationConfig.sourceConfig.factTable.tableNameWithSchema
  416.   private val targetTableNameWithSchema = aggregationConfig.targetConfig.factTable.tableNameWithSchema
  417.   private val kuduSourceOptions: Map[String, String] = kuduOptions(sourceTableNameWithSchema)
  418.   private val kuduTargetOptions: Map[String, String] = kuduOptions(targetTableNameWithSchema)
  419.  
  420.   private val timestampColumnName = aggregationConfig.sourceConfig.timeColumnName
  421.  
  422.   private val partitioningActor = context.actorOf(Props(classOf[PartitioningActor], kuduContext, aggregationConfig), s"PartitioningActor_$targetTableNameWithSchema")
  423.  
  424.   private implicit val timeout: Timeout = Timeout.durationToTimeout(KuduProperties.kuduPartitioningTimeout)
  425.  
  426.   private val SPARK_BLACKLIST_EXCEPTION_MESSAGE = "Cannot run anywhere due to node and executor blacklist"
  427.  
  428.   override def receive: Receive = {
  429.     case ExecuteAggregation(data, offset) =>
  430.       Try {
  431.         Await.result(partitioningActor ? CreatePartitionsFromBatch(data), timeout.duration)
  432.       } match {
  433.         case Success(result) if result.equals(Done) => handleAggregationCalculation(data, offset)
  434.         case Success(result) if result.equals(Failure) => log.info(s"Partitioning Actor has ended its work with error. Calculations for $targetTableNameWithSchema skipped.")
  435.         case Failure(exception) => log.error(exception, s"AggregationCalculation: ${context.self.path.name} failed on waiting for response from its child - PartitioningActor. Calculations for $targetTableNameWithSchema skipped.")
  436.       }
  437.       sender() ! Done
  438.     case DeleteKuduTable =>
  439.       Try {
  440.         ImpalaExternalTable.dropTable(targetTableNameWithSchema)(KuduConnectionPool)
  441.         if (kuduContext.tableExists(targetTableNameWithSchema)) {
  442.           kuduContext.deleteTable(targetTableNameWithSchema)
  443.         }
  444.         log.info("Table {} has been dropped.", targetTableNameWithSchema)
  445.       }
  446.       sender() ! Done
  447.       context.stop(self)
  448.     case _ => log.warning("Unexpected message")
  449.   }
  450.  
  451.   private def handleAggregationCalculation(aggregationDtoBatch: List[AggregationDTO], offset: Long): Unit = {
  452.     aggregationDtoBatch.foreach { element =>
  453.       Try {
  454.         calculateAggregation(element, offset)
  455.       } match {
  456.         case Success(aggregationStats: ETLStatsDTO) =>
  457.           log.info(s"Aggregation $element calculated successfully for target: ${aggregationConfig.targetConfig.factTable.tableNameWithSchema}")
  458.           produceAggregationKafkaMessages(element, aggregationStats, aggregationConfig, offset)
  459.         case Failure(exception) =>
  460.           sendAggregationLogs(s"Unable to calculate aggregation $element for target: ${aggregationConfig.targetConfig.factTable.tableNameWithSchema}", aggregationConfig, Status.FAILURE, Some(exception))
  461.           exception match {
  462.             case ex: SparkException if ex.getMessage.contains(SPARK_BLACKLIST_EXCEPTION_MESSAGE) => throw ex
  463.             case _ =>
  464.           }
  465.       }
  466.     }
  467.   }
  468.  
  469.   private def calculateAggregation(aggregationDTO: AggregationDTO, offset: Long): ETLStatsDTO = {
  470.     val startTime = System.currentTimeMillis
  471.  
  472.     val firstRowTime = new Timestamp(aggregationDTO.firstRowTime)
  473.     val lastRowTime = new Timestamp(aggregationDTO.lastRowTime)
  474.  
  475.     val (extractResult: ExtractedResult, extractDuration: Long) = timedExecution(
  476.       readDataFrame(aggregationDTO, firstRowTime.toLocalDateTime, lastRowTime.toLocalDateTime)
  477.     )
  478.  
  479.     val (dataFrameToWrite: DataFrame, transformDuration: Long) = timedExecution(
  480.       aggregateDataFrame(offset, extractResult.extracted)
  481.     )
  482.  
  483.     val (countLoaded: Long, loadDuration: Long) = timedExecution(writeDataFrame(dataFrameToWrite))
  484.  
  485.     extractResult.extracted.unpersist
  486.     val endTime = System.currentTimeMillis
  487.  
  488.     ETLStatsDTO(countExtracted = extractResult.extractedRowNum,
  489.       timeExtract = extractDuration,
  490.       timeTransform = transformDuration,
  491.       countLoaded = countLoaded,
  492.       timeLoad = loadDuration,
  493.       timeTotal = endTime - startTime,
  494.       firstRowTime = firstRowTime.getTime,
  495.       lastRowTime = lastRowTime.getTime,
  496.       logTime = System.currentTimeMillis,
  497.       contextId = aggregationConfig.targetConfig.factTable.factId,
  498.       status = Status.SUCCESS)
  499.   }
  500.  
  501.   private def writeDataFrame(dataFrame: DataFrame): Long = {
  502.     val countLoaded = dataFrame.count
  503.     dataFrame.write
  504.       .options(kuduTargetOptions)
  505.       .mode(SaveMode.Append)
  506.       .kudu
  507.  
  508.     dataFrame.unpersist()
  509.     countLoaded
  510.   }
  511.  
  512.   private def aggregateDataFrame(offset: Long, dataFrame: DataFrame) = {
  513.     dataFrame
  514.       .groupBy(aggregationConfig.timeColumn :: aggregationConfig.targetConfig.groupByColumns: _*)
  515.       .agg(countAggregation, aggregationConfig.targetConfig.measureAggregations(aggregationConfig.sourceConfig.factType): _*)
  516.       .withColumn(batchIdFiled.name, batchIdAggregation(offset))
  517.   }
  518.  
  519.   private def readDataFrame(aggregationDTO: AggregationDTO, firstRowTime: LocalDateTime, lastRowTime: LocalDateTime) = {
  520.     val dataFrame = (aggregationConfig.sourceConfig.factType match {
  521.       case FactType.Aggregated =>
  522.         Env.spark.read.options(kuduSourceOptions).kudu
  523.           .filter(timeFilterCondition(firstRowTime = firstRowTime, lastRowTime = lastRowTime)
  524.             .and(col(batchIdFiled.name).isInCollection(aggregationDTO.batchIds)))
  525.       case FactType.Raw =>
  526.         val dfCaseSensitive = Env.spark.read.format(AppProperties.etlOutputFormat).load(aggregationDTO.outputFiles: _*)
  527.         dfCaseSensitive.toDF(dfCaseSensitive.columns.map(_.toLowerCase): _*)
  528.     }).cache
  529.  
  530.     ExtractedResult(dataFrame, dataFrame.count)
  531.   }
  532.  
  533.   private def produceAggregationKafkaMessages(aggregationDTO: AggregationDTO, aggregationStats: ETLStatsDTO, config: AggregationConfig, offset: Long): Future[RecordMetadata] = {
  534.     kafkaProducer.sendMessage(
  535.       s"${KafkaProperties.aggregationCalculationTopicName}_${config.targetConfig.factTable.factId}",
  536.       aggregationDTO.copy(
  537.         firstRowTime = aggregationConfig.sourceConfig.aggregatePeriod.shiftToStart(aggregationDTO.firstRowTime),
  538.         lastRowTime = aggregationConfig.sourceConfig.aggregatePeriod.shiftToEnd(aggregationDTO.lastRowTime),
  539.         factId = config.targetConfig.factTable.factId,
  540.         batchIds = Seq(offset),
  541.         outputFiles = Seq.empty
  542.       )
  543.     )
  544.     kafkaProducer.sendPartitionedMessage(
  545.       KafkaCoreProperties.aggregationStatsTopic,
  546.       KafkaCoreProperties.aggregationStatsPartitions,
  547.       aggregationStats,
  548.       Option(aggregationStats.contextId.toString)
  549.     )
  550.   }
  551.  
  552.   private def timeFilterCondition(firstRowTime: LocalDateTime, lastRowTime: LocalDateTime): Column = {
  553.     val partitionCondition = aggregationConfig.sourceConfig.timePartitionColumn.map { timePartitionColumn =>
  554.       val sourcePartitionUnit = TimePartition.timePartitionFromColumnNameWithDefault(timePartitionColumn.columnName).unit
  555.       col(timePartitionColumn.columnName) >= lit(firstRowTime.truncatedTo(sourcePartitionUnit).toString) and
  556.         col(timePartitionColumn.columnName) < lit(lastRowTime.truncatedTo(sourcePartitionUnit).toString)
  557.     }
  558.  
  559.     val conditions = partitionCondition ++ Seq(col(timestampColumnName) >= lit(Timestamp.valueOf(firstRowTime)),
  560.       col(timestampColumnName) < lit(Timestamp.valueOf(lastRowTime)))
  561.     conditions.reduce(_ and _)
  562.   }
  563.  
  564.   private def timedExecution[R](codeBlock: => R): (R, Long) = {
  565.     val startTime = System.currentTimeMillis()
  566.     val result = codeBlock
  567.     val completeTime = System.currentTimeMillis()
  568.     (result, completeTime - startTime)
  569.   }
  570.  
  571.   override def postStop(): Unit = {
  572.     log.info(s"AggregationCalculation for target: ${aggregationConfig.targetConfig.factTable.tableNameWithSchema} stopped.")
  573.   }
  574. }
  575.  
  576. object AggregationCalculation {
  577.  
  578.   case class ExtractedResult(extracted: DataFrame, extractedRowNum: Long)
  579.  
  580.   case class LoadedResult(extracted: DataFrame, extractedRowNum: Long)
  581.  
  582. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement