Advertisement
Guest User

Untitled

a guest
Aug 23rd, 2017
99
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 7.06 KB | None | 0 0
  1. object ConfigLoader {
  2.   val conf = ConfigFactory.load()
  3.   val configs = new ConfigParser(config.ConfigFromTypesafeConf(config.typesafeConf))
  4.  
  5.   val managerConfig: HealthCheckManagerConf = configs.parse[HealthCheckManagerConf]()
  6.   val c3DbConf = configs.parse[C3DBConfig]()
  7.   val aggregatesDbConf = configs.parse[AggregatesDBConfig]()
  8.   val pubDashDBConf = configs.parse[PubDashDBConf]()
  9.   val impalaDbConf = configs.parse[ImpalaDbConf]()
  10.   val feedStateDbConf = configs.parse[FeedStateDBConf]()
  11.   val s3credentials = configs.parse[S3Credentials]()
  12.   val anAuthConf = configs.parse[AppnexusAuthConf]()
  13.   val asyncS3ClientConf = configs.parse[AsyncS3ClientConf]()
  14.   val feedStatsConfig = configs.parse[FeedStatsConfig]()
  15.  
  16.   val aggregatesConnPool = ConnectionPool("aggregates", aggregatesDbConf)
  17.   val c3ConnPool = ConnectionPool("c3", c3DbConf)
  18.   val pubDashPool = ConnectionPool("pubdash", pubDashDBConf)
  19.   val impalaDbConnPool = ImpalaConnectionPool(impalaDbConf)
  20.   val feedstateConnPool = ConnectionPool("feedstate", feedStateDbConf)
  21.   val anproxyUrl: String = conf.getString("anproxy_url")
  22.   val parquetReadParallelism: Int = conf.getInt("parquet_read_parallelism")
  23. }
  24.  
  25. object HealthCheckManager extends CaptifyLogging {
  26.  
  27.   import ConfigLoader._
  28.  
  29.   implicit val execCtx = GlobalExecutionContextProvider.get()
  30.  
  31.   val sparkConf = new SparkConf().setAppName("health-check").setMaster("local[4]")
  32.  
  33.   val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
  34.   val hadoopConf = sparkSession.sparkContext.hadoopConfiguration
  35.  
  36.   hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
  37.   hadoopConf.set("fs.s3a.awsAccessKeyId", s3credentials.accessKey)
  38.   hadoopConf.set("fs.s3a.awsSecretAccessKey", s3credentials.secretKey)
  39.   hadoopConf.set("fs.gs.project.id", "event-feeds-dbm-dt-files")
  40.   hadoopConf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
  41.   hadoopConf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
  42.   hadoopConf.set("fs.gs.implicit.dir.repair.enable", "false")
  43.   hadoopConf.set("google.cloud.auth.service.account.enable", "true")
  44.   hadoopConf.set("google.cloud.auth.service.account.email", "...")
  45.   hadoopConf.set("google.cloud.auth.service.account.keyfile", new File("./provision,,,,").getCanonicalPath)
  46.  
  47.   lazy val ningProvider = new NingAsyncClientProviderImpl()
  48.   lazy val s3EmailManager = new S3EmailManager(sparkSession.sparkContext)
  49.  
  50.   def main(args: Array[String]): Unit = {
  51.  
  52.     runFeedStatsCheck
  53.     runOtherDiscrepancyChecks
  54.  
  55.     sendEmailWithAllDiscrepancies
  56.  
  57.     sparkSession.stop()
  58.     NamedDB("aggregates").close()
  59.     NamedDB("c3").close()
  60.     NamedDB("pubdash").close()
  61.     NamedDB("impala").close()
  62.     NamedDB("feedstate").close()
  63.     ningProvider.close()
  64.  
  65.   }
  66.  
  67.   val standardFeedTestList: IndexedSeq[StandardFeedDiscrepancyTest] = {
  68.     val daysBeforeDBM = (1 to 5).map(DateTime.now().minusDays(_).withZone(DateTimeZone.forID("Europe/London")).withTimeAtStartOfDay())
  69.     val daysBeforeAppnexus = (1 to 2).map(DateTime.now().minusDays(_).withZone(DateTimeZone.UTC).withTimeAtStartOfDay())
  70.  
  71.     lazy val dbmApi = DBMApi
  72.  
  73.     def getMetricForSource(source: DataSource, date: DateTime) = MetricData(source, date, date.plusDays(1))
  74.  
  75.     val dbm_metrics = daysBeforeDBM.map(date => MetricData(dbmApi, date, date))
  76.  
  77.     lazy val appnexusApi = new AppnexusApi()
  78.     val appnexusMetrics = daysBeforeAppnexus.map(date => getMetricForSource(appnexusApi, date))
  79.  
  80.     lazy val impalaDBM = new ImpalaDbDBM()
  81.     lazy val impalaDbmMetrics = daysBeforeDBM.map(date => getMetricForSource(impalaDBM, date))
  82.  
  83.     lazy val impalaAppnexus = new ImpalaDbAppnexus()
  84.     lazy val impalaAppnexusMetrics = daysBeforeAppnexus.map(date => getMetricForSource(impalaAppnexus, date))
  85.  
  86.     (
  87.       (appnexusMetrics zip impalaAppnexusMetrics) ++
  88.         (dbm_metrics zip impalaDbmMetrics))
  89.       .map { case (m1, m2) => new StandardFeedDiscrepancyTest(m1, m2) }
  90.   }
  91.  
  92.   def runOtherDiscrepancyChecks: Unit = {
  93.  
  94.     val dbm = new DbmLineItemSource()
  95.     val aggregates = new AggregatesLineItemSource(Dsp.DBM)
  96.  
  97.     val dspLineItemsDBMTests = (1 to 5).map(DateTime.now().minusDays)
  98.       .map(new DspLineItemTest(dbm, aggregates, _))
  99.  
  100.     val campaignManagerStatsTests = (1 to 2).map(DateTime.now().minusDays)
  101.       .map(new CampaignManagerStatsTest(CampaignManager, ImpalaStandardFeedLineItemsCampaign, _))
  102.  
  103.     val dictionaryTests = Seq(
  104.       OSAppnexusDictionarySource,
  105.       OSDBMDictionarySource,
  106.       BrowserAppnexusDictionarySource,
  107.       BrowserDBMDictionarySource,
  108.       DeviceAppnexusDictionarySource
  109.     ).map(new DictionaryTest(_))
  110.  
  111.     val formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
  112.     val days = IndexedSeq(
  113.       "2016-09-16 08:00:00",
  114.       "2016-11-18 08:00:00"
  115.     ).map(
  116.       formatter.parseDateTime(_).withZone(DateTimeZone.UTC).withTimeAtStartOfDay()
  117.     )
  118.     val attributionDiscrepancyTestForSpecificDate = days.map(new AttributionDiscrepancyImpalaTest(_, Dsp.DBM))
  119.  
  120.     val attributionDiscrepancyImpalaTest = for (
  121.       dayBefore <- 1 to 1;
  122.       dsp <- Dsp.values
  123.     ) yield {
  124.       new AttributionDiscrepancyImpalaTest(DateTime.now().minusDays(dayBefore), dsp)
  125.     }
  126.  
  127.     val today = DateTime.now()
  128.     val yesterday = DateTime.now().minusDays(1).withZone(DateTimeZone.UTC).withTimeAtStartOfDay()
  129.  
  130.     (standardFeedTestList
  131.       //   ++ attributionDiscrepancyTestForSpecificDate
  132.       ++ attributionDiscrepancyImpalaTest
  133.       ++ Seq(
  134.       new RawStandardFeedTest("Appnexus", ANRawStandardFeedSource, ANEDBStandardFeedSource, yesterday, today),
  135.       new RawStandardFeedTest("DBM", DBMRawStandardFeedSource, DBMEDBStandardFeedSource, yesterday, today),
  136.       new VideoFeedTest("Appnexus", VideoFeedApiAppnexusSource, EDBVideoFeedAppnexusSource, yesterday, today),
  137.       new AttributionSummaryImpalaTest(DateTime.now().minusWeeks(1), yesterday, Dsp.Appnexus),
  138.       new AttributionSummaryImpalaTest(DateTime.now().minusWeeks(1), yesterday, Dsp.DBM),
  139.       new ReplicationDiscrepancyTest(),
  140.       new PubdashDiscrepancyTest(new PubDashSchemaSource, new ImpalaDbSource, true),
  141.       new StatsLineItemTest(new StandardFeedLineItemsImpala(), new StatsLineItemTotalsC3(), true),
  142.       new EntityDiscrepancyTest(Campaigns, yesterday),
  143.       new EntityDiscrepancyTest(Profiles, yesterday),
  144.       new DBMEntityTest(sparkSession, LineItemEntity, today),
  145.       new DBMEntityTest(sparkSession, PixelEntity, today),
  146.       new DBMEntityTest(sparkSession, InsertionOrderEntity, today)
  147.     ) ++ dspLineItemsDBMTests
  148.       ++ campaignManagerStatsTests
  149.       ++ dictionaryTests)
  150.       .zipWithIndex
  151.       .flatMap { case (test, i) => test.runAndWriteResultToFile(i) }
  152.       .foreach(s3EmailManager.saveFileToS3)
  153.   }
  154.  
  155.   def sendEmailWithAllDiscrepancies: Unit = {
  156.     if (managerConfig.sendEmailEnabled) {
  157.       s3EmailManager.getAllReports match {
  158.         case Success(s) => EmailSender(s.status, s.body).sendEmail
  159.           s3EmailManager.deleteAllEmails
  160.         case Failure(thrown) => logger.error(thrown.getMessage)
  161.       }
  162.     }
  163.   }
  164.  
  165. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement