Advertisement
Guest User

Untitled

a guest
Nov 13th, 2019
174
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.77 KB | None | 0 0
  1. package com.dhl.datalake.sq
  2.  
  3. import org.apache.log4j.{Level, Logger}
  4. import org.apache.spark.sql.streaming.OutputMode
  5. import org.apache.spark.sql.{SQLContext, SQLImplicits, SparkSession}
  6. import org.apache.spark.sql.types._
  7. import org.apache.spark.sql.functions.from_json
  8. import java.nio.file.{Files, Paths}
  9. import org.apache.hadoop.fs.{FileSystem, Path}
  10.  
  11. import scala.io.Source
  12. import com.dhl.datalake.sq.CommandLineArgs.parserOptions
  13. import org.apache.commons.io.FileExistsException
  14. import org.apache.hadoop.hbase.HBaseConfiguration
  15. import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}
  16.  
  17. import scala.collection.mutable.ArrayBuffer
  18.  
  19. object MirrorMaker {
  20.  
  21. val TERMINATION_AWAIT_TIME = 800
  22. val QUERY_MANAGER_REFRESH_INTERVAL = 10 * 1000
  23.  
  24. /* Logging */
  25. val logger: Logger = Logger.getLogger(getClass.getName)
  26.  
  27. /* Query Manager */
  28. var queries = ArrayBuffer[QueryManager]()
  29.  
  30. object HBaseConnection {
  31.  
  32. private var _connection : Connection = null
  33.  
  34. def instance() : Connection= {
  35.  
  36. if (_connection == null) {
  37. val hbaseConf = HBaseConfiguration.create()
  38. hbaseConf.set("hbase.zookeeper.quorum", EnvironmentConfig.config.hbaseZookeeperQuorum)
  39. hbaseConf.set("hbase.zookeeper.property.clientPort", EnvironmentConfig.config.hbaseZookeeperPropertyClientPort)
  40. hbaseConf.set("mapr.hbase.default.db", EnvironmentConfig.config.maprHbaseDefaultDb)
  41. _connection = ConnectionFactory.createConnection(hbaseConf)
  42. }
  43. _connection
  44. }
  45.  
  46. def close() = {
  47. if (_connection != null) {
  48. _connection.close()
  49. }
  50. }
  51. }
  52.  
  53. object SparkSessionInstance
  54. {
  55. private var _spark: SparkSession = null
  56.  
  57. object implicits extends SQLImplicits {
  58. protected override def _sqlContext: SQLContext = instance.sqlContext
  59. }
  60.  
  61. def instance(): SparkSession = {
  62.  
  63. if (_spark == null){
  64. _spark = SparkSession
  65. .builder()
  66. .appName("Snapshot Queries - Mirror Maker")
  67. .enableHiveSupport()
  68. .getOrCreate()
  69. }
  70. _spark
  71. }
  72.  
  73. def close() = {
  74. if (_spark != null) {
  75. _spark.close()
  76. }
  77. }
  78. }
  79.  
  80. import SparkSessionInstance.implicits._
  81.  
  82. class waitForStop() extends Runnable {
  83.  
  84. override def run(): Unit = {
  85.  
  86. var isShutdownRequested: Boolean = false
  87. var isYARNLive: Boolean = true
  88. val lockFile = s"${ApplicationConfig.config.locksBaseFolder}sq_mirror_maker.lock"
  89. createRunningFlag(lockFile)
  90.  
  91. while (!isShutdownRequested && isYARNLive) {
  92. queries.foreach { q: QueryManager =>
  93. q.refresh()
  94. println(q)
  95. }
  96. Thread.sleep(QUERY_MANAGER_REFRESH_INTERVAL)
  97. isShutdownRequested = !Files.exists(Paths.get(lockFile))
  98. isYARNLive = !SparkSessionInstance.instance.sparkContext.isStopped
  99. }
  100. queries.foreach{ _.stop() }
  101. }
  102. }
  103.  
  104. def using[A <: { def close(): Unit }, B](resource: A)(f: A => B): B = {
  105. try {
  106. f(resource)
  107. } finally {
  108. resource.close()
  109. }
  110. }
  111.  
  112. /**
  113. * Starts the application
  114. * @param args application arguments
  115. */
  116. def main(args: Array[String]) {
  117. CommandLineArgs.parserArguments().parse(args, parserOptions()) match {
  118. case Some(config) =>
  119. run(config)
  120. case None =>
  121. throw new IllegalArgumentException(CommandLineArgs.parserArguments().renderTwoColumnsUsage)
  122. }
  123. }
  124.  
  125. /**
  126. * Loads message schema from a file specified by schemaFilePath
  127. * @param schemaFilePath path to a file containing message schema
  128. * @return Message schema (StructType)
  129. */
  130. def getSchema(schemaFilePath: String): StructType = {
  131. val jsonSource = Source.fromFile(schemaFilePath)
  132. val jsonString: String = jsonSource.mkString
  133. .trim.replaceFirst("\ufeff","")
  134. .replaceAll(" +", "").replaceAll("\n", "")
  135. jsonSource.close
  136. val schema = DataType.fromJson(jsonString).asInstanceOf[StructType]
  137. schema
  138. }
  139.  
  140. /**
  141. * Creates a controlling file that is used for shutting down the application
  142. * @param path Path to the controlling file
  143. */
  144. def createRunningFlag(path: String): Unit = {
  145. logger.info("Creating indication file")
  146. try {
  147. Files.createFile(Paths.get(path))
  148. } catch {
  149. case fe: FileExistsException => {
  150. logger.warn(fe.getMessage)
  151. }
  152. }
  153. }
  154.  
  155. def offsetChecker(checkpointFolder: String) = {
  156. try {
  157. val fs = FileSystem.get(SparkSessionInstance.instance.sparkContext.hadoopConfiguration)
  158. val files = fs.listStatus(new Path(checkpointFolder + "/offsets"))
  159. val checkpoint = files.sortWith(_.getPath.getName < _.getPath.getName).last
  160. logger.warn(s"Last checkpoint found for ${checkpointFolder}: ${checkpoint.getPath.getName}")
  161.  
  162. val offsets = SparkSessionInstance.instance.sparkContext.textFile(checkpoint.getPath.toString).take(3)(2)
  163. if (offsets == "{}") {
  164. logger.warn(s"Invalid checkpoint found ${checkpointFolder}, removing last one!")
  165. fs.delete(checkpoint.getPath)
  166. } else {
  167. logger.warn(s"Offsets for ${checkpointFolder} checked and OK. Proceeding!")
  168. }
  169. } catch {
  170. case e: Exception => {
  171. logger.error(e)
  172. }
  173. }
  174. }
  175.  
  176. /**
  177. * Reads Kafka message stream specified by topicName, parses it based on a schema specified by schema parameter
  178. * and saves parsed messages using org.apache.spark.sql.jdbcsink class
  179. * @param schema Message schema
  180. * @return handle to the continuously running execution (StreamingQuery)
  181. */
  182. def processStream(tableName: String, tablePrefix: String, schema: StructType,
  183. driver: String, url: String, dbUser: String, dbPassword: String): String = {
  184.  
  185. val checkpointFolder = s"${ApplicationConfig.config.checkpointBaseFolder}${tablePrefix.toLowerCase}_${tableName.toLowerCase}"
  186. val streamTopic = s"${ApplicationConfig.config.streamBaseFolder}:${tablePrefix}.${tableName}"
  187.  
  188. // Sometimes, Spark manages to write empty offset (most probably bug)
  189. // This method checks offsets, if issue appears, it will fix it
  190. offsetChecker(checkpointFolder)
  191.  
  192. SparkSessionInstance.instance.readStream
  193. .format("kafka")
  194. .option("kafka.bootstrap.servers", EnvironmentConfig.config.bootstrapServers)
  195. .option("subscribe", streamTopic)
  196. .option("startingOffsets", "earliest")
  197. .option("maxOffsetsPerTrigger", "100000")
  198. .load()
  199. .selectExpr(
  200. "CAST(key AS STRING) AS key",
  201. "CAST(value AS STRING) AS value"
  202. ).as[(String, String)]
  203. .withColumn("value", from_json($"value", schema))
  204. .select(
  205. $"value.message.headers.operation".as("sys_change_operation"),
  206. $"value.message.headers.changeSequence".as("sys_change_version"),
  207. $"key",
  208. $"value.message.data.*")
  209. .writeStream
  210. .outputMode(OutputMode.Append())
  211. .queryName(s"${tablePrefix}_${tableName}")
  212. .format("org.apache.spark.sql.jdbcsink")
  213. .option("numPartitions", "4")
  214. .option("driver", driver)
  215. .option("url", url)
  216. .option("dbtable", tableName)
  217. .option("user", dbUser)
  218. .option("password", dbPassword)
  219. .option("checkpointLocation", checkpointFolder)
  220. .start()
  221. .id.toString
  222. }
  223.  
  224. /**
  225. * Starts execution of the application
  226. * @param options Application parameters
  227. */
  228. def run(options: parserOptions): Unit = {
  229.  
  230. ConfigurationParser.loadEnvironmentConfig(options.environmentConfigPath)
  231. ConfigurationParser.loadApplicationConfig(options.applicationConfigPath)
  232.  
  233. SparkSessionInstance.instance.streams.addListener(new ProgressListener())
  234.  
  235. val url = ApplicationConfig.config.dbConnectionString
  236. val driver = "com.mysql.cj.jdbc.Driver"
  237. val dbUser = ApplicationConfig.config.dbUserName
  238. val dbPassword = Crypto.decrypt(
  239. key = ApplicationConfig.config.masterPass,
  240. password_file = ApplicationConfig.config.passwordFile)
  241.  
  242. using(Source.fromFile(ApplicationConfig.config.tableListPath)) { source =>
  243. for (line <- source.getLines) {
  244. val tablePrefix = line.split(";")(0)
  245. val tableName = line.split(";")(1)
  246. val topicName = s"${tablePrefix.toLowerCase}_${tableName.toLowerCase}"
  247. val schema = getSchema(s"${ApplicationConfig.config.schemaBaseFolder}${topicName}.txt")
  248.  
  249. queries += new QueryManager(tablePrefix, tableName,
  250. schema, driver, url, dbUser, dbPassword, processStream)
  251.  
  252. logger.info(s"Creating QueryManager for ${tableName}")
  253. }
  254. }
  255.  
  256. val thread = new Thread(new waitForStop())
  257. thread.start()
  258. thread.join()
  259.  
  260. scala.sys.addShutdownHook {
  261. HBaseConnection.close()
  262. SparkSessionInstance.close()
  263. }
  264. }
  265. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement