Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.enel.beat.exa.core.publishing.jobs
- import java.io.File
- import java.util
- import java.util.UUID
- import com.endesa.sime.inject.Handler
- import com.enel.beat.exa.bigdata.staging.TxStagingTable
- import com.enel.beat.exa.common.dao.consumption.ConsPeriodicClosureDAO
- import com.enel.beat.exa.common.model.po.ConsPeriodicClosurePO
- import com.enel.beat.exa.common.utils.hdfs.HdfsUtilsImpl
- import com.enel.beat.exa.common.utils.property.PropertyManager
- import com.enel.beat.exa.core.f2b.util._
- import com.enel.beat.exa.core.inventory.response.EquipmentResponse
- import com.enel.beat.exa.core.publishing.jobs.utils.CHelper
- import org.apache.spark.SparkContext
- import org.apache.spark.rdd.RDD
- import org.junit.runner.RunWith
- import org.scalatest.junit.JUnitRunner
- import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
- import scala.collection.JavaConversions._
- @RunWith(classOf[JUnitRunner])
- class GenerateC1FileJobIT extends FlatSpec with BeforeAndAfter with Matchers{
- private var env : IntegrationEnvironment = _
- private var sparkContext : SparkContext = _
- private var weldEnv : WeldEnvironment = _
- private lazy val NumMessages = 10
- before {
- setupEnvironment()
- }
- after {
- //Delete "Cons Periodic Closure" table.
- ExabeatEnvironmentUtils.cleanHBaseTable(weldEnv, "CONS_PERIODIC_CLOSURE")
- }
- "GenerateC1FileJob" should "generate a C1 File on its HDFS route" in {
- val hdfs = new Handler[HdfsUtilsImpl].get()
- val config = new Handler[PropertyManager].get()
- val dao = new Handler[ConsPeriodicClosureDAO].get()
- // ACT
- GenerateC1FileJob {
- def sc(): SparkContext = sparkContext
- }
- // ASSERT
- val processed: util.List[ConsPeriodicClosurePO] = dao.findWithoutValidityDate("201201","ES0031500116924002EJ0F","120120131096001501102111")
- processed should have size NumMessages
- assertSavedInformation(processed.get(0))
- }
- /**
- ######## ## ## ## ##
- ## ### ## ## ##
- ## #### ## ## ##
- ###### ## ## ## ## ##
- ## ## #### ## ##
- ## ## ### ## ##
- ######## ## ## ###
- */
- def setupEnvironment(): Unit = {
- val sparkEnv = new SparkEnvironment
- val hbaseEnv = new HBaseEnvironment
- weldEnv = new WeldEnvironment
- val env = new IntegrationEnvironment.Builder()
- .addAsyncEnvironment(weldEnv)
- .addSyncEnvironment(sparkEnv)
- .addSyncEnvironment(new ZookeeperEnvironment)
- .addSyncEnvironment(new HadoopEnvironment)
- .addSyncEnvironment(hbaseEnv)
- .bootstrap()
- this.env = env
- this.sparkContext = sparkEnv.getEnv
- hbaseEnv.importDataToHBase(OnlySendTable, onlyCreateTable)
- setupPreviousData()
- }
- private def OnlySendTable = {
- new OnlyRegistryCreationStrategy {
- override def filesToImport(): java.util.List[File] = {
- val toImport = super.filesToImport()
- val sendTableFiles = getFiles(toImport, "005-CCM_create.sql")
- val processLogError = getFiles(toImport, "006_PC_create.sql")
- sendTableFiles ++ processLogError
- }
- private def getFiles(toImport: util.Collection[File], filename: String): List[File] = {
- val sqlFile= toImport.toList.filter(_.getName == filename)
- if(sqlFile.isEmpty) throw new IllegalStateException("not found " + filename)
- sqlFile
- }
- }
- }
- def onlyCreateTable: SimpleSQLFileParser = {
- ExabeatEnvironmentUtils.readByPredicate("Create table CONS_PERIODIC_CLOSURE")
- }
- private def setupPreviousData(): Unit = {
- val propMan = new Handler[PropertyManager].get()
- val hdfs = new Handler[HdfsUtilsImpl].get()
- val helper = new Handler[CHelper].get()
- // Path to be written
- val staging = new TxStagingTable(propMan.publicationFFormatConsPeriodicClousureC1StagingArea)
- // val rdd: RDD[EquipmentResponse] = sparkContext.parallelize(1 to NumMessages).flatMap(_ =>
- //
- // )
- // val dst = staging.write(rdd)
- // hdfs.emptyFile(dst + "/COMPLETE")
- }
- /**
- ## ## ######## #### ## ######
- ## ## ## ## ## ## ##
- ## ## ## ## ## ##
- ## ## ## ## ## ######
- ## ## ## ## ## ##
- ## ## ## ## ## ## ##
- ####### ## #### ######## ######
- */
- private def assertSavedInformation(sent: ConsPeriodicClosurePO): Unit = {
- sent.getPod should not be null
- sent.getMagnitude should not be null
- sent.getMeasStartYM should not be null
- sent.getMeasStartD should not be null
- sent.getPeriod should not be null
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement