Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.dhl.datalake.sq
- import org.apache.log4j.{Level, Logger}
- import org.apache.spark.sql.streaming.OutputMode
- import org.apache.spark.sql.{SQLContext, SQLImplicits, SparkSession}
- import org.apache.spark.sql.types._
- import org.apache.spark.sql.functions.from_json
- import java.nio.file.{Files, Paths}
- import org.apache.hadoop.fs.{FileSystem, Path}
- import scala.io.Source
- import com.dhl.datalake.sq.CommandLineArgs.parserOptions
- import org.apache.commons.io.FileExistsException
- import org.apache.hadoop.hbase.HBaseConfiguration
- import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}
- import scala.collection.mutable.ArrayBuffer
- object MirrorMaker {
- val TERMINATION_AWAIT_TIME = 800
- val QUERY_MANAGER_REFRESH_INTERVAL = 10 * 1000
- /* Logging */
- val logger: Logger = Logger.getLogger(getClass.getName)
- /* Query Manager */
- var queries = ArrayBuffer[QueryManager]()
- object HBaseConnection {
- private var _connection : Connection = null
- def instance() : Connection= {
- if (_connection == null) {
- val hbaseConf = HBaseConfiguration.create()
- hbaseConf.set("hbase.zookeeper.quorum", EnvironmentConfig.config.hbaseZookeeperQuorum)
- hbaseConf.set("hbase.zookeeper.property.clientPort", EnvironmentConfig.config.hbaseZookeeperPropertyClientPort)
- hbaseConf.set("mapr.hbase.default.db", EnvironmentConfig.config.maprHbaseDefaultDb)
- _connection = ConnectionFactory.createConnection(hbaseConf)
- }
- _connection
- }
- def close() = {
- if (_connection != null) {
- _connection.close()
- }
- }
- }
- object SparkSessionInstance
- {
- private var _spark: SparkSession = null
- object implicits extends SQLImplicits {
- protected override def _sqlContext: SQLContext = instance.sqlContext
- }
- def instance(): SparkSession = {
- if (_spark == null){
- _spark = SparkSession
- .builder()
- .appName("Snapshot Queries - Mirror Maker")
- .enableHiveSupport()
- .getOrCreate()
- }
- _spark
- }
- def close() = {
- if (_spark != null) {
- _spark.close()
- }
- }
- }
- import SparkSessionInstance.implicits._
- class waitForStop() extends Runnable {
- override def run(): Unit = {
- var isShutdownRequested: Boolean = false
- var isYARNLive: Boolean = true
- val lockFile = s"${ApplicationConfig.config.locksBaseFolder}sq_mirror_maker.lock"
- createRunningFlag(lockFile)
- while (!isShutdownRequested && isYARNLive) {
- queries.foreach { q: QueryManager =>
- q.refresh()
- println(q)
- }
- Thread.sleep(QUERY_MANAGER_REFRESH_INTERVAL)
- isShutdownRequested = !Files.exists(Paths.get(lockFile))
- isYARNLive = !SparkSessionInstance.instance.sparkContext.isStopped
- }
- queries.foreach{ _.stop() }
- }
- }
- def using[A <: { def close(): Unit }, B](resource: A)(f: A => B): B = {
- try {
- f(resource)
- } finally {
- resource.close()
- }
- }
- /**
- * Starts the application
- * @param args application arguments
- */
- def main(args: Array[String]) {
- CommandLineArgs.parserArguments().parse(args, parserOptions()) match {
- case Some(config) =>
- run(config)
- case None =>
- throw new IllegalArgumentException(CommandLineArgs.parserArguments().renderTwoColumnsUsage)
- }
- }
- /**
- * Loads message schema from a file specified by schemaFilePath
- * @param schemaFilePath path to a file containing message schema
- * @return Message schema (StructType)
- */
- def getSchema(schemaFilePath: String): StructType = {
- val jsonSource = Source.fromFile(schemaFilePath)
- val jsonString: String = jsonSource.mkString
- .trim.replaceFirst("\ufeff","")
- .replaceAll(" +", "").replaceAll("\n", "")
- jsonSource.close
- val schema = DataType.fromJson(jsonString).asInstanceOf[StructType]
- schema
- }
- /**
- * Creates a controlling file that is used for shutting down the application
- * @param path Path to the controlling file
- */
- def createRunningFlag(path: String): Unit = {
- logger.info("Creating indication file")
- try {
- Files.createFile(Paths.get(path))
- } catch {
- case fe: FileExistsException => {
- logger.warn(fe.getMessage)
- }
- }
- }
- def offsetChecker(checkpointFolder: String) = {
- try {
- val fs = FileSystem.get(SparkSessionInstance.instance.sparkContext.hadoopConfiguration)
- val files = fs.listStatus(new Path(checkpointFolder + "/offsets"))
- val checkpoint = files.sortWith(_.getPath.getName < _.getPath.getName).last
- logger.warn(s"Last checkpoint found for ${checkpointFolder}: ${checkpoint.getPath.getName}")
- val offsets = SparkSessionInstance.instance.sparkContext.textFile(checkpoint.getPath.toString).take(3)(2)
- if (offsets == "{}") {
- logger.warn(s"Invalid checkpoint found ${checkpointFolder}, removing last one!")
- fs.delete(checkpoint.getPath)
- } else {
- logger.warn(s"Offsets for ${checkpointFolder} checked and OK. Proceeding!")
- }
- } catch {
- case e: Exception => {
- logger.error(e)
- }
- }
- }
- /**
- * Reads Kafka message stream specified by topicName, parses it based on a schema specified by schema parameter
- * and saves parsed messages using org.apache.spark.sql.jdbcsink class
- * @param schema Message schema
- * @return handle to the continuously running execution (StreamingQuery)
- */
- def processStream(tableName: String, tablePrefix: String, schema: StructType,
- driver: String, url: String, dbUser: String, dbPassword: String): String = {
- val checkpointFolder = s"${ApplicationConfig.config.checkpointBaseFolder}${tablePrefix.toLowerCase}_${tableName.toLowerCase}"
- val streamTopic = s"${ApplicationConfig.config.streamBaseFolder}:${tablePrefix}.${tableName}"
- // Sometimes, Spark manages to write empty offset (most probably bug)
- // This method checks offsets, if issue appears, it will fix it
- offsetChecker(checkpointFolder)
- SparkSessionInstance.instance.readStream
- .format("kafka")
- .option("kafka.bootstrap.servers", EnvironmentConfig.config.bootstrapServers)
- .option("subscribe", streamTopic)
- .option("startingOffsets", "earliest")
- .option("maxOffsetsPerTrigger", "100000")
- .load()
- .selectExpr(
- "CAST(key AS STRING) AS key",
- "CAST(value AS STRING) AS value"
- ).as[(String, String)]
- .withColumn("value", from_json($"value", schema))
- .select(
- $"value.message.headers.operation".as("sys_change_operation"),
- $"value.message.headers.changeSequence".as("sys_change_version"),
- $"key",
- $"value.message.data.*")
- .writeStream
- .outputMode(OutputMode.Append())
- .queryName(s"${tablePrefix}_${tableName}")
- .format("org.apache.spark.sql.jdbcsink")
- .option("numPartitions", "4")
- .option("driver", driver)
- .option("url", url)
- .option("dbtable", tableName)
- .option("user", dbUser)
- .option("password", dbPassword)
- .option("checkpointLocation", checkpointFolder)
- .start()
- .id.toString
- }
- /**
- * Starts execution of the application
- * @param options Application parameters
- */
- def run(options: parserOptions): Unit = {
- ConfigurationParser.loadEnvironmentConfig(options.environmentConfigPath)
- ConfigurationParser.loadApplicationConfig(options.applicationConfigPath)
- SparkSessionInstance.instance.streams.addListener(new ProgressListener())
- val url = ApplicationConfig.config.dbConnectionString
- val driver = "com.mysql.cj.jdbc.Driver"
- val dbUser = ApplicationConfig.config.dbUserName
- val dbPassword = Crypto.decrypt(
- key = ApplicationConfig.config.masterPass,
- password_file = ApplicationConfig.config.passwordFile)
- using(Source.fromFile(ApplicationConfig.config.tableListPath)) { source =>
- for (line <- source.getLines) {
- val tablePrefix = line.split(";")(0)
- val tableName = line.split(";")(1)
- val topicName = s"${tablePrefix.toLowerCase}_${tableName.toLowerCase}"
- val schema = getSchema(s"${ApplicationConfig.config.schemaBaseFolder}${topicName}.txt")
- queries += new QueryManager(tablePrefix, tableName,
- schema, driver, url, dbUser, dbPassword, processStream)
- logger.info(s"Creating QueryManager for ${tableName}")
- }
- }
- val thread = new Thread(new waitForStop())
- thread.start()
- thread.join()
- scala.sys.addShutdownHook {
- HBaseConnection.close()
- SparkSessionInstance.close()
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement