Advertisement
Guest User

Untitled

a guest
Feb 20th, 2018
273
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.66 KB | None | 0 0
  1. package com.enel.beat.exa.core.publishing.jobs
  2.  
  3. import java.io.File
  4. import java.util
  5. import java.util.UUID
  6.  
  7. import com.endesa.sime.inject.Handler
  8. import com.enel.beat.exa.bigdata.staging.TxStagingTable
  9. import com.enel.beat.exa.common.dao.consumption.ConsPeriodicClosureDAO
  10. import com.enel.beat.exa.common.model.po.ConsPeriodicClosurePO
  11. import com.enel.beat.exa.common.utils.hdfs.HdfsUtilsImpl
  12. import com.enel.beat.exa.common.utils.property.PropertyManager
  13. import com.enel.beat.exa.core.f2b.util._
  14. import com.enel.beat.exa.core.inventory.response.EquipmentResponse
  15. import com.enel.beat.exa.core.publishing.jobs.utils.CHelper
  16. import org.apache.spark.SparkContext
  17. import org.apache.spark.rdd.RDD
  18. import org.junit.runner.RunWith
  19. import org.scalatest.junit.JUnitRunner
  20. import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
  21.  
  22. import scala.collection.JavaConversions._
  23.  
  24. @RunWith(classOf[JUnitRunner])
  25. class GenerateC1FileJobIT extends FlatSpec with BeforeAndAfter with Matchers{
  26.  
  27. private var env : IntegrationEnvironment = _
  28. private var sparkContext : SparkContext = _
  29. private var weldEnv : WeldEnvironment = _
  30. private lazy val NumMessages = 10
  31.  
  32. before {
  33. setupEnvironment()
  34. }
  35.  
  36. after {
  37. //Delete "Cons Periodic Closure" table.
  38. ExabeatEnvironmentUtils.cleanHBaseTable(weldEnv, "CONS_PERIODIC_CLOSURE")
  39. }
  40.  
  41. "GenerateC1FileJob" should "generate a C1 File on its HDFS route" in {
  42. val hdfs = new Handler[HdfsUtilsImpl].get()
  43. val config = new Handler[PropertyManager].get()
  44. val dao = new Handler[ConsPeriodicClosureDAO].get()
  45. // ACT
  46. GenerateC1FileJob {
  47. def sc(): SparkContext = sparkContext
  48. }
  49.  
  50. // ASSERT
  51. val processed: util.List[ConsPeriodicClosurePO] = dao.findWithoutValidityDate("201201","ES0031500116924002EJ0F","120120131096001501102111")
  52.  
  53. processed should have size NumMessages
  54. assertSavedInformation(processed.get(0))
  55.  
  56.  
  57.  
  58. }
  59. /**
  60. ######## ## ## ## ##
  61. ## ### ## ## ##
  62. ## #### ## ## ##
  63. ###### ## ## ## ## ##
  64. ## ## #### ## ##
  65. ## ## ### ## ##
  66. ######## ## ## ###
  67. */
  68. def setupEnvironment(): Unit = {
  69. val sparkEnv = new SparkEnvironment
  70. val hbaseEnv = new HBaseEnvironment
  71. weldEnv = new WeldEnvironment
  72.  
  73. val env = new IntegrationEnvironment.Builder()
  74. .addAsyncEnvironment(weldEnv)
  75. .addSyncEnvironment(sparkEnv)
  76. .addSyncEnvironment(new ZookeeperEnvironment)
  77. .addSyncEnvironment(new HadoopEnvironment)
  78. .addSyncEnvironment(hbaseEnv)
  79. .bootstrap()
  80.  
  81. this.env = env
  82. this.sparkContext = sparkEnv.getEnv
  83.  
  84. hbaseEnv.importDataToHBase(OnlySendTable, onlyCreateTable)
  85. setupPreviousData()
  86. }
  87.  
  88. private def OnlySendTable = {
  89. new OnlyRegistryCreationStrategy {
  90.  
  91. override def filesToImport(): java.util.List[File] = {
  92. val toImport = super.filesToImport()
  93. val sendTableFiles = getFiles(toImport, "005-CCM_create.sql")
  94. val processLogError = getFiles(toImport, "006_PC_create.sql")
  95. sendTableFiles ++ processLogError
  96. }
  97.  
  98. private def getFiles(toImport: util.Collection[File], filename: String): List[File] = {
  99. val sqlFile= toImport.toList.filter(_.getName == filename)
  100. if(sqlFile.isEmpty) throw new IllegalStateException("not found " + filename)
  101. sqlFile
  102. }
  103. }
  104. }
  105. def onlyCreateTable: SimpleSQLFileParser = {
  106. ExabeatEnvironmentUtils.readByPredicate("Create table CONS_PERIODIC_CLOSURE")
  107. }
  108.  
  109.  
  110. private def setupPreviousData(): Unit = {
  111. val propMan = new Handler[PropertyManager].get()
  112. val hdfs = new Handler[HdfsUtilsImpl].get()
  113. val helper = new Handler[CHelper].get()
  114.  
  115.  
  116.  
  117. // Path to be written
  118. val staging = new TxStagingTable(propMan.publicationFFormatConsPeriodicClousureC1StagingArea)
  119.  
  120. // val rdd: RDD[EquipmentResponse] = sparkContext.parallelize(1 to NumMessages).flatMap(_ =>
  121. //
  122. // )
  123. // val dst = staging.write(rdd)
  124.  
  125. // hdfs.emptyFile(dst + "/COMPLETE")
  126. }
  127.  
  128. /**
  129. ## ## ######## #### ## ######
  130. ## ## ## ## ## ## ##
  131. ## ## ## ## ## ##
  132. ## ## ## ## ## ######
  133. ## ## ## ## ## ##
  134. ## ## ## ## ## ## ##
  135. ####### ## #### ######## ######
  136. */
  137. private def assertSavedInformation(sent: ConsPeriodicClosurePO): Unit = {
  138. sent.getPod should not be null
  139. sent.getMagnitude should not be null
  140. sent.getMeasStartYM should not be null
  141. sent.getMeasStartD should not be null
  142. sent.getPeriod should not be null
  143. }
  144. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement