Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.dhl.datalake.sq
- import java.nio.file.{Files, Paths}
- import java.util.concurrent.{Executors, TimeUnit}
- import com.dhl.datalake.sq.CommandLineArgs.parserOptions
- import org.apache.commons.io.FileExistsException
- import org.apache.log4j.{Level, Logger}
- import org.apache.spark.sql.{DataFrame, SparkSession}
- import org.apache.spark.sql.functions.{from_json, rank}
- import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
- import org.apache.spark.sql.types._
- import io.delta.tables._
- import org.apache.spark.sql.expressions.Window
- import scala.io.Source
- /**
- *
- *
- */
- object DeltaTester {
- val logger: Logger = Logger.getLogger(getClass.getName)
- logger.setLevel(Level.ALL)
- var deltaTable: DeltaTable = _
- /**
- * Starts the application
- * @param args application arguments
- */
- def main(args: Array[String]) {
- logger.info("Starting application")
- CommandLineArgs.parserArguments().parse(args, parserOptions()) match {
- case Some(config) =>
- val spark = SparkSession
- .builder()
- .appName(config.applicationName)
- .enableHiveSupport()
- .getOrCreate()
- run(config, spark)
- }
- }
- /**
- * Loads message schema from a file specified by schemaFilePath
- * @param schemaFilePath path to a file containing message schema
- * @return Message schema (StructType)
- */
- def getSchema(schemaFilePath: String): StructType = {
- logger.info(s"Reading schema from $schemaFilePath")
- /*
- val input = new ObjectInputStream(new FileInputStream(schemaFilePath))
- val schema = input.readObject().asInstanceOf[StructType]
- input.close()
- */
- val jsonSource = Source.fromFile(schemaFilePath)
- val jsonString: String = jsonSource.mkString
- .trim.replaceFirst("\ufeff","")
- .replaceAll(" +", "").replaceAll("\n", "")
- jsonSource.close
- val schema = DataType.fromJson(jsonString).asInstanceOf[StructType]
- schema
- }
- /**
- * Normalizes (replaces all non-word characters) string specified by originalString parameter
- * @param originalString Source string for normalization
- * @return Normalized string
- */
- def normalizeString(originalString: String): String = {
- originalString.replaceAll("\\W","_").replaceAll("_+","_")
- }
- /**
- * Creates a controlling file that is used for shutting down the application
- * @param path Path to the controlling file
- */
- def createRunningFlag(path: String): Unit = {
- logger.info("Creating indication file")
- try {
- Files.createFile(Paths.get(path))
- } catch {
- case fe: FileExistsException => {
- logger.warn(fe.getMessage)
- }
- }
- }
- def loadBatch(spark: SparkSession, topicName: String, schema: StructType) = {
- import spark.implicits._
- val query = spark.read
- .format("kafka")
- .option("kafka.bootstrap.servers", EnvironmentConfig.config.bootstrapServers)
- .option("subscribe", s"${ApplicationConfig.config.streamName}:${topicName}")
- .option("startingOffsets", "earliest")
- .option("assign", s"""{ "${ApplicationConfig.config.streamName}:${topicName}" : [0,1] }""")
- .load()
- .selectExpr(
- "CAST(key AS STRING) AS key",
- "CAST(value AS STRING) AS value"
- ).as[(String, String)]
- .withColumn("value", from_json($"value", schema))
- .select(
- $"key",
- $"value.message.data.*",
- $"value.message.headers.operation".as("sys_change_operation"),
- $"value.message.headers.changeSequence".as("sys_change_version"))
- val rankBy = Window.partitionBy($"key").orderBy($"sys_change_version".desc)
- val df = query.withColumn("rank", rank().over(rankBy)).where($"rank" === 1).drop($"rank")
- df.write.format("delta").save("/noramdatalake/aggregates2")
- }
- /**
- * Reads Kafka message stream specified by topicName, parses it based on a schema specified by schema parameter
- * and saves parsed messages using org.apache.spark.sql.jdbcsink class
- * @param spark SparkSession
- * @param topicName Name of a topic to be processed
- * @param schema Message schema
- * @return handle to the continuously running execution (StreamingQuery)
- */
- def loadStream(
- spark: SparkSession,
- topicName: String,
- schema: StructType): StreamingQuery = {
- import spark.implicits._
- val query = spark.readStream
- .format("kafka")
- .option("kafka.bootstrap.servers", EnvironmentConfig.config.bootstrapServers)
- .option("subscribe", s"${ApplicationConfig.config.streamName}:${topicName}")
- .option("startingOffsets", "earliest")
- .option("maxOffsetsPerTrigger", "1000")
- .load()
- .selectExpr(
- "CAST(key AS STRING) AS key",
- "CAST(value AS STRING) AS value"
- ).as[(String, String)]
- .withColumn("value", from_json($"value", schema))
- .select(
- $"key",
- $"value.message.data.*",
- $"value.message.headers.operation".as("sys_change_operation"),
- $"value.message.headers.changeSequence".as("sys_change_version"))
- .writeStream
- .queryName(topicName)
- .format("delta")
- .option("checkpointLocation", s"${ApplicationConfig.config.checkpointBaseFolder}${normalizeString(topicName)}")
- .foreachBatch(upsertToDelta _)
- .outputMode(OutputMode.Update())
- .start()
- query
- }
- /**
- * Controls if a stream processing should run based on a flag file. If this file is deleted processing is stopped.
- * @param spark active SparkSession
- * @param query StreamingQuery to be monitored and eventually stopped
- * @return instance of Runnable class
- */
- def queryManager(spark: SparkSession, query: StreamingQuery) = new Runnable {
- val SLEEP_TIME_MILLIS = 25000
- var shouldRun: Boolean = true
- override def run(): Unit = {
- logger.info(s"Started query manager for ${query.name}")
- while(shouldRun) {
- Thread.sleep(SLEEP_TIME_MILLIS)
- shouldRun = Files.exists(Paths.get(s"${ApplicationConfig.config.locksBaseFolder}${query.name}.lock"))
- }
- for (query <- spark.streams.active) {
- logger.info(s"Stopping query ${query.name}")
- query.stop()
- }
- }
- }
- def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
- deltaTable.as("t")
- .merge(
- microBatchOutputDF.as("s"),
- "s.key = t.key")
- .whenMatched("s.sys_change_operation = 'DELETE'").delete()
- .whenMatched().updateAll()
- .whenNotMatched().insertAll()
- .execute()
- }
- /**
- * Starts execution of the application
- * @param options Application parameters
- * @param spark active SparkSession
- */
- def run(options: parserOptions, spark: SparkSession): Unit = {
- val TERMINATION_AWAIT_TIME = 800
- ConfigurationParser.loadApplicationConfig(options.applicationConfig)
- ConfigurationParser.loadEnvironmentConfig(options.environmentConfig)
- val topicName = options.topicName
- createRunningFlag(s"${ApplicationConfig.config.locksBaseFolder}${topicName}.lock")
- val schema = getSchema(
- s"${ApplicationConfig.config.schemaBaseFolder}${normalizeString(topicName)}.txt")
- //loadBatch(spark, topicName, schema)
- deltaTable = DeltaTable.forPath(spark, "/noramdatalake/aggregates2")
- spark.streams.addListener(new QueryListener())
- val query = loadStream(spark, topicName, schema)
- val executor = Executors.newSingleThreadExecutor()
- executor.submit(queryManager(spark, query))
- logger.info("Awaiting termination")
- for(query <- spark.streams.active){
- query.awaitTermination()
- }
- executor.shutdown()
- try {
- if (!executor.awaitTermination(TERMINATION_AWAIT_TIME, TimeUnit.MILLISECONDS)) {
- executor.shutdownNow()
- }
- } catch {
- case ie: InterruptedException => {
- logger.error(ie.getMessage)
- executor.shutdownNow()
- }
- case unknown => logger.error("Got exception: " + unknown)
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement