Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- object ConfigLoader {
- val conf = ConfigFactory.load()
- val configs = new ConfigParser(config.ConfigFromTypesafeConf(config.typesafeConf))
- val managerConfig: HealthCheckManagerConf = configs.parse[HealthCheckManagerConf]()
- val c3DbConf = configs.parse[C3DBConfig]()
- val aggregatesDbConf = configs.parse[AggregatesDBConfig]()
- val pubDashDBConf = configs.parse[PubDashDBConf]()
- val impalaDbConf = configs.parse[ImpalaDbConf]()
- val feedStateDbConf = configs.parse[FeedStateDBConf]()
- val s3credentials = configs.parse[S3Credentials]()
- val anAuthConf = configs.parse[AppnexusAuthConf]()
- val asyncS3ClientConf = configs.parse[AsyncS3ClientConf]()
- val feedStatsConfig = configs.parse[FeedStatsConfig]()
- val aggregatesConnPool = ConnectionPool("aggregates", aggregatesDbConf)
- val c3ConnPool = ConnectionPool("c3", c3DbConf)
- val pubDashPool = ConnectionPool("pubdash", pubDashDBConf)
- val impalaDbConnPool = ImpalaConnectionPool(impalaDbConf)
- val feedstateConnPool = ConnectionPool("feedstate", feedStateDbConf)
- val anproxyUrl: String = conf.getString("anproxy_url")
- val parquetReadParallelism: Int = conf.getInt("parquet_read_parallelism")
- }
- object HealthCheckManager extends CaptifyLogging {
- import ConfigLoader._
- implicit val execCtx = GlobalExecutionContextProvider.get()
- val sparkConf = new SparkConf().setAppName("health-check").setMaster("local[4]")
- val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
- val hadoopConf = sparkSession.sparkContext.hadoopConfiguration
- hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
- hadoopConf.set("fs.s3a.awsAccessKeyId", s3credentials.accessKey)
- hadoopConf.set("fs.s3a.awsSecretAccessKey", s3credentials.secretKey)
- hadoopConf.set("fs.gs.project.id", "event-feeds-dbm-dt-files")
- hadoopConf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
- hadoopConf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
- hadoopConf.set("fs.gs.implicit.dir.repair.enable", "false")
- hadoopConf.set("google.cloud.auth.service.account.enable", "true")
- hadoopConf.set("google.cloud.auth.service.account.email", "...")
- hadoopConf.set("google.cloud.auth.service.account.keyfile", new File("./provision,,,,").getCanonicalPath)
- lazy val ningProvider = new NingAsyncClientProviderImpl()
- lazy val s3EmailManager = new S3EmailManager(sparkSession.sparkContext)
- def main(args: Array[String]): Unit = {
- runFeedStatsCheck
- runOtherDiscrepancyChecks
- sendEmailWithAllDiscrepancies
- sparkSession.stop()
- NamedDB("aggregates").close()
- NamedDB("c3").close()
- NamedDB("pubdash").close()
- NamedDB("impala").close()
- NamedDB("feedstate").close()
- ningProvider.close()
- }
- val standardFeedTestList: IndexedSeq[StandardFeedDiscrepancyTest] = {
- val daysBeforeDBM = (1 to 5).map(DateTime.now().minusDays(_).withZone(DateTimeZone.forID("Europe/London")).withTimeAtStartOfDay())
- val daysBeforeAppnexus = (1 to 2).map(DateTime.now().minusDays(_).withZone(DateTimeZone.UTC).withTimeAtStartOfDay())
- lazy val dbmApi = DBMApi
- def getMetricForSource(source: DataSource, date: DateTime) = MetricData(source, date, date.plusDays(1))
- val dbm_metrics = daysBeforeDBM.map(date => MetricData(dbmApi, date, date))
- lazy val appnexusApi = new AppnexusApi()
- val appnexusMetrics = daysBeforeAppnexus.map(date => getMetricForSource(appnexusApi, date))
- lazy val impalaDBM = new ImpalaDbDBM()
- lazy val impalaDbmMetrics = daysBeforeDBM.map(date => getMetricForSource(impalaDBM, date))
- lazy val impalaAppnexus = new ImpalaDbAppnexus()
- lazy val impalaAppnexusMetrics = daysBeforeAppnexus.map(date => getMetricForSource(impalaAppnexus, date))
- (
- (appnexusMetrics zip impalaAppnexusMetrics) ++
- (dbm_metrics zip impalaDbmMetrics))
- .map { case (m1, m2) => new StandardFeedDiscrepancyTest(m1, m2) }
- }
- def runOtherDiscrepancyChecks: Unit = {
- val dbm = new DbmLineItemSource()
- val aggregates = new AggregatesLineItemSource(Dsp.DBM)
- val dspLineItemsDBMTests = (1 to 5).map(DateTime.now().minusDays)
- .map(new DspLineItemTest(dbm, aggregates, _))
- val campaignManagerStatsTests = (1 to 2).map(DateTime.now().minusDays)
- .map(new CampaignManagerStatsTest(CampaignManager, ImpalaStandardFeedLineItemsCampaign, _))
- val dictionaryTests = Seq(
- OSAppnexusDictionarySource,
- OSDBMDictionarySource,
- BrowserAppnexusDictionarySource,
- BrowserDBMDictionarySource,
- DeviceAppnexusDictionarySource
- ).map(new DictionaryTest(_))
- val formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
- val days = IndexedSeq(
- "2016-09-16 08:00:00",
- "2016-11-18 08:00:00"
- ).map(
- formatter.parseDateTime(_).withZone(DateTimeZone.UTC).withTimeAtStartOfDay()
- )
- val attributionDiscrepancyTestForSpecificDate = days.map(new AttributionDiscrepancyImpalaTest(_, Dsp.DBM))
- val attributionDiscrepancyImpalaTest = for (
- dayBefore <- 1 to 1;
- dsp <- Dsp.values
- ) yield {
- new AttributionDiscrepancyImpalaTest(DateTime.now().minusDays(dayBefore), dsp)
- }
- val today = DateTime.now()
- val yesterday = DateTime.now().minusDays(1).withZone(DateTimeZone.UTC).withTimeAtStartOfDay()
- (standardFeedTestList
- // ++ attributionDiscrepancyTestForSpecificDate
- ++ attributionDiscrepancyImpalaTest
- ++ Seq(
- new RawStandardFeedTest("Appnexus", ANRawStandardFeedSource, ANEDBStandardFeedSource, yesterday, today),
- new RawStandardFeedTest("DBM", DBMRawStandardFeedSource, DBMEDBStandardFeedSource, yesterday, today),
- new VideoFeedTest("Appnexus", VideoFeedApiAppnexusSource, EDBVideoFeedAppnexusSource, yesterday, today),
- new AttributionSummaryImpalaTest(DateTime.now().minusWeeks(1), yesterday, Dsp.Appnexus),
- new AttributionSummaryImpalaTest(DateTime.now().minusWeeks(1), yesterday, Dsp.DBM),
- new ReplicationDiscrepancyTest(),
- new PubdashDiscrepancyTest(new PubDashSchemaSource, new ImpalaDbSource, true),
- new StatsLineItemTest(new StandardFeedLineItemsImpala(), new StatsLineItemTotalsC3(), true),
- new EntityDiscrepancyTest(Campaigns, yesterday),
- new EntityDiscrepancyTest(Profiles, yesterday),
- new DBMEntityTest(sparkSession, LineItemEntity, today),
- new DBMEntityTest(sparkSession, PixelEntity, today),
- new DBMEntityTest(sparkSession, InsertionOrderEntity, today)
- ) ++ dspLineItemsDBMTests
- ++ campaignManagerStatsTests
- ++ dictionaryTests)
- .zipWithIndex
- .flatMap { case (test, i) => test.runAndWriteResultToFile(i) }
- .foreach(s3EmailManager.saveFileToS3)
- }
- def sendEmailWithAllDiscrepancies: Unit = {
- if (managerConfig.sendEmailEnabled) {
- s3EmailManager.getAllReports match {
- case Success(s) => EmailSender(s.status, s.body).sendEmail
- s3EmailManager.deleteAllEmails
- case Failure(thrown) => logger.error(thrown.getMessage)
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement