Advertisement
Guest User

Untitled

a guest
Oct 15th, 2019
82
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.05 KB | None | 0 0
  1. package com.dhl.datalake.sq
  2.  
  3. import java.nio.file.{Files, Paths}
  4. import java.util.concurrent.{Executors, TimeUnit}
  5.  
  6. import com.dhl.datalake.sq.CommandLineArgs.parserOptions
  7. import org.apache.commons.io.FileExistsException
  8. import org.apache.log4j.{Level, Logger}
  9. import org.apache.spark.sql.{DataFrame, SparkSession}
  10. import org.apache.spark.sql.functions.{from_json, rank}
  11. import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
  12. import org.apache.spark.sql.types._
  13. import io.delta.tables._
  14. import org.apache.spark.sql.expressions.Window
  15.  
  16. import scala.io.Source
  17.  
  18. /**
  19. *
  20. *
  21. */
  22. object DeltaTester {
  23. val logger: Logger = Logger.getLogger(getClass.getName)
  24.  
  25. logger.setLevel(Level.ALL)
  26. var deltaTable: DeltaTable = _
  27.  
  28. /**
  29. * Starts the application
  30. * @param args application arguments
  31. */
  32. def main(args: Array[String]) {
  33. logger.info("Starting application")
  34.  
  35. CommandLineArgs.parserArguments().parse(args, parserOptions()) match {
  36.  
  37. case Some(config) =>
  38. val spark = SparkSession
  39. .builder()
  40. .appName(config.applicationName)
  41. .enableHiveSupport()
  42. .getOrCreate()
  43.  
  44. run(config, spark)
  45.  
  46. }
  47. }
  48.  
  49. /**
  50. * Loads message schema from a file specified by schemaFilePath
  51. * @param schemaFilePath path to a file containing message schema
  52. * @return Message schema (StructType)
  53. */
  54. def getSchema(schemaFilePath: String): StructType = {
  55. logger.info(s"Reading schema from $schemaFilePath")
  56. /*
  57. val input = new ObjectInputStream(new FileInputStream(schemaFilePath))
  58. val schema = input.readObject().asInstanceOf[StructType]
  59. input.close()
  60. */
  61. val jsonSource = Source.fromFile(schemaFilePath)
  62. val jsonString: String = jsonSource.mkString
  63. .trim.replaceFirst("\ufeff","")
  64. .replaceAll(" +", "").replaceAll("\n", "")
  65. jsonSource.close
  66. val schema = DataType.fromJson(jsonString).asInstanceOf[StructType]
  67. schema
  68. }
  69.  
  70. /**
  71. * Normalizes (replaces all non-word characters) string specified by originalString parameter
  72. * @param originalString Source string for normalization
  73. * @return Normalized string
  74. */
  75. def normalizeString(originalString: String): String = {
  76. originalString.replaceAll("\\W","_").replaceAll("_+","_")
  77. }
  78.  
  79. /**
  80. * Creates a controlling file that is used for shutting down the application
  81. * @param path Path to the controlling file
  82. */
  83. def createRunningFlag(path: String): Unit = {
  84. logger.info("Creating indication file")
  85. try {
  86. Files.createFile(Paths.get(path))
  87. } catch {
  88. case fe: FileExistsException => {
  89. logger.warn(fe.getMessage)
  90. }
  91. }
  92. }
  93.  
  94. def loadBatch(spark: SparkSession, topicName: String, schema: StructType) = {
  95.  
  96. import spark.implicits._
  97.  
  98. val query = spark.read
  99. .format("kafka")
  100. .option("kafka.bootstrap.servers", EnvironmentConfig.config.bootstrapServers)
  101. .option("subscribe", s"${ApplicationConfig.config.streamName}:${topicName}")
  102. .option("startingOffsets", "earliest")
  103. .option("assign", s"""{ "${ApplicationConfig.config.streamName}:${topicName}" : [0,1] }""")
  104. .load()
  105. .selectExpr(
  106. "CAST(key AS STRING) AS key",
  107. "CAST(value AS STRING) AS value"
  108. ).as[(String, String)]
  109. .withColumn("value", from_json($"value", schema))
  110. .select(
  111. $"key",
  112. $"value.message.data.*",
  113. $"value.message.headers.operation".as("sys_change_operation"),
  114. $"value.message.headers.changeSequence".as("sys_change_version"))
  115.  
  116. val rankBy = Window.partitionBy($"key").orderBy($"sys_change_version".desc)
  117. val df = query.withColumn("rank", rank().over(rankBy)).where($"rank" === 1).drop($"rank")
  118.  
  119. df.write.format("delta").save("/noramdatalake/aggregates2")
  120. }
  121.  
  122. /**
  123. * Reads Kafka message stream specified by topicName, parses it based on a schema specified by schema parameter
  124. * and saves parsed messages using org.apache.spark.sql.jdbcsink class
  125. * @param spark SparkSession
  126. * @param topicName Name of a topic to be processed
  127. * @param schema Message schema
  128. * @return handle to the continuously running execution (StreamingQuery)
  129. */
  130. def loadStream(
  131. spark: SparkSession,
  132. topicName: String,
  133. schema: StructType): StreamingQuery = {
  134. import spark.implicits._
  135.  
  136. val query = spark.readStream
  137. .format("kafka")
  138. .option("kafka.bootstrap.servers", EnvironmentConfig.config.bootstrapServers)
  139. .option("subscribe", s"${ApplicationConfig.config.streamName}:${topicName}")
  140. .option("startingOffsets", "earliest")
  141. .option("maxOffsetsPerTrigger", "1000")
  142. .load()
  143. .selectExpr(
  144. "CAST(key AS STRING) AS key",
  145. "CAST(value AS STRING) AS value"
  146. ).as[(String, String)]
  147. .withColumn("value", from_json($"value", schema))
  148. .select(
  149. $"key",
  150. $"value.message.data.*",
  151. $"value.message.headers.operation".as("sys_change_operation"),
  152. $"value.message.headers.changeSequence".as("sys_change_version"))
  153. .writeStream
  154. .queryName(topicName)
  155. .format("delta")
  156. .option("checkpointLocation", s"${ApplicationConfig.config.checkpointBaseFolder}${normalizeString(topicName)}")
  157. .foreachBatch(upsertToDelta _)
  158. .outputMode(OutputMode.Update())
  159. .start()
  160.  
  161. query
  162. }
  163.  
  164. /**
  165. * Controls if a stream processing should run based on a flag file. If this file is deleted processing is stopped.
  166. * @param spark active SparkSession
  167. * @param query StreamingQuery to be monitored and eventually stopped
  168. * @return instance of Runnable class
  169. */
  170. def queryManager(spark: SparkSession, query: StreamingQuery) = new Runnable {
  171. val SLEEP_TIME_MILLIS = 25000
  172. var shouldRun: Boolean = true
  173.  
  174. override def run(): Unit = {
  175. logger.info(s"Started query manager for ${query.name}")
  176.  
  177. while(shouldRun) {
  178. Thread.sleep(SLEEP_TIME_MILLIS)
  179. shouldRun = Files.exists(Paths.get(s"${ApplicationConfig.config.locksBaseFolder}${query.name}.lock"))
  180. }
  181. for (query <- spark.streams.active) {
  182. logger.info(s"Stopping query ${query.name}")
  183. query.stop()
  184. }
  185. }
  186. }
  187.  
  188. def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  189. deltaTable.as("t")
  190. .merge(
  191. microBatchOutputDF.as("s"),
  192. "s.key = t.key")
  193. .whenMatched("s.sys_change_operation = 'DELETE'").delete()
  194. .whenMatched().updateAll()
  195. .whenNotMatched().insertAll()
  196. .execute()
  197. }
  198.  
  199.  
  200. /**
  201. * Starts execution of the application
  202. * @param options Application parameters
  203. * @param spark active SparkSession
  204. */
  205. def run(options: parserOptions, spark: SparkSession): Unit = {
  206. val TERMINATION_AWAIT_TIME = 800
  207.  
  208. ConfigurationParser.loadApplicationConfig(options.applicationConfig)
  209. ConfigurationParser.loadEnvironmentConfig(options.environmentConfig)
  210.  
  211. val topicName = options.topicName
  212. createRunningFlag(s"${ApplicationConfig.config.locksBaseFolder}${topicName}.lock")
  213. val schema = getSchema(
  214. s"${ApplicationConfig.config.schemaBaseFolder}${normalizeString(topicName)}.txt")
  215.  
  216. //loadBatch(spark, topicName, schema)
  217. deltaTable = DeltaTable.forPath(spark, "/noramdatalake/aggregates2")
  218.  
  219. spark.streams.addListener(new QueryListener())
  220. val query = loadStream(spark, topicName, schema)
  221. val executor = Executors.newSingleThreadExecutor()
  222. executor.submit(queryManager(spark, query))
  223.  
  224. logger.info("Awaiting termination")
  225. for(query <- spark.streams.active){
  226. query.awaitTermination()
  227. }
  228.  
  229. executor.shutdown()
  230. try {
  231. if (!executor.awaitTermination(TERMINATION_AWAIT_TIME, TimeUnit.MILLISECONDS)) {
  232. executor.shutdownNow()
  233. }
  234. } catch {
  235. case ie: InterruptedException => {
  236. logger.error(ie.getMessage)
  237. executor.shutdownNow()
  238. }
  239. case unknown => logger.error("Got exception: " + unknown)
  240. }
  241. }
  242. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement