Advertisement
hivefans

CustomDirectKafkaExample.scala

Mar 30th, 2017
550
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 5.01 KB | None | 0 0
  1. package main.scala
  2.  
  3. object CustomDirectKafkaExample {
  4.   private val conf = ConfigFactory.load()
  5.   private val sparkStreamingConf = conf.getStringList("CustomDirectKafkaExample-List").asScala
  6.   val sparkConf = new SparkConf()
  7.   val logger = Logger.getLogger(CustomDirectKafkaExample.getClass)
  8.  
  9.   def main(args: Array[String]) {
  10.     if (args.length < 2) {
  11.       System.exit(1)
  12.     }
  13.     sparkConf.setAppName(conf.getString("CustomDirectKafkaExample")) //  setting spark conf parameters
  14.     sparkStreamingConf.foreach { x => val split = x.split("="); sparkConf.set(split(0), split(1));}
  15.     val sc = new SparkContext(sparkConf) //create spark and memsql context
  16.     //  declare checkpoint directory,pass to getOrCreate() method
  17.     val Array(brokers, topics) = args
  18.     val checkpointDir = CHECKPOINT_DIRECTORY_REQUEST
  19.     val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
  20.     val topicsSet = topics.split(",").toSet
  21.     val ssc =  setupSsc(topicsSet, kafkaParams, checkpointDir,msc)
  22.     /* Start the spark streaming   */
  23.     ssc.start()
  24.     ssc.awaitTermination();
  25.   }//main() ends
  26.  
  27.   def setupSsc(topicsSet: Set[String], kafkaParams: Map[String, String])(): StreamingContext = {
  28.     val sc = msc.sparkContext
  29.     val ssc = new StreamingContext(sc, Seconds(conf.getInt("application.sparkbatchinterval")))
  30.     /* create direct kafka stream */
  31.     val messages = createCustomDirectKafkaStream(ssc,kafkaParams,"localhost","/kafka", topicsSet)
  32.     val line = messages.map(_._2)
  33.     val lines = line.flatMap(line => line.split("\n"))
  34.     val filterLines = lines.filter { x => LogFilter.filter(x, "0") }
  35.     filterLines.foreachRDD((rdd: RDD[String], time: Time) => {
  36.       rdd.foreachPartition { partitionOfRecords =>
  37.         {if(partitionOfRecords.isEmpty)
  38.           {
  39.             logger.info("partitionOfRecords FOUND EMPTY ,IGNORING THIS PARTITION")}
  40.           else
  41.           {
  42.             /* write computation logic here  */
  43.         }//else loop ends
  44.       } //partition ends
  45.        })
  46.     ssc
  47.   }//setUp(ssc) ends
  48.   /* createDirectStream() method overloaded */
  49.   def createCustomDirectKafkaStream(ssc: StreamingContext, kafkaParams: Map[String, String], zkHosts: String
  50.                                     , zkPath: String, topics: Set[String]): InputDStream[(String, String)] = {
  51.     val topic = topics.last //TODO only for single kafka topic right now
  52.     val zkClient = new ZkClient(zkHosts, 30000, 30000)
  53.     val storedOffsets = readOffsets(zkClient,zkHosts, zkPath, topic)
  54.     val kafkaStream = storedOffsets match {
  55.       case None => // start from the latest offsets
  56.         KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
  57.       case Some(fromOffsets) => // start from previously saved offsets
  58.         val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
  59.         KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder
  60.           , (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
  61.         }
  62.     // save the offsets
  63.     kafkaStream.foreachRDD(rdd => saveOffsets(zkClient,zkHosts, zkPath, rdd))
  64.     kafkaStream
  65.   }
  66.  
  67.   /*
  68.    Read the previously saved offsets from Zookeeper
  69.     */
  70.   private def readOffsets(zkClient: ZkClient,zkHosts:String, zkPath: String, topic: String):
  71.   Option[Map[TopicAndPartition, Long]] = {
  72.     logger.info("Reading offsets from Zookeeper")
  73.     val stopwatch = new Stopwatch()
  74.     val (offsetsRangesStrOpt, _) = ZkUtils.readDataMaybeNull(zkClient, zkPath)
  75.     offsetsRangesStrOpt match {
  76.       case Some(offsetsRangesStr) =>
  77.         logger.info(s"Read offset ranges: ${offsetsRangesStr}")
  78.         val offsets = offsetsRangesStr.split(",")
  79.           .map(s => s.split(":"))
  80.           .map { case Array(partitionStr, offsetStr) => (TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) }
  81.           .toMap
  82.         logger.info("Done reading offsets from Zookeeper. Took " + stopwatch)
  83.         Some(offsets)
  84.       case None =>
  85.         logger.info("No offsets found in Zookeeper. Took " + stopwatch)
  86.         None
  87.     }
  88.   }
  89.  
  90.   private def saveOffsets(zkClient: ZkClient,zkHosts:String, zkPath: String, rdd: RDD[_]): Unit = {
  91.     logger.info("Saving offsets to Zookeeper")
  92.     val stopwatch = new Stopwatch()
  93.     val offsetsRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  94.     offsetsRanges.foreach(offsetRange => logger.debug(s"Using ${offsetRange}"))
  95.     val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.fromOffset}")
  96.       .mkString(",")
  97.     logger.info("chandan Writing offsets to Zookeeper zkClient="+zkClient+"  zkHosts="+zkHosts+"zkPath="+zkPath+"  offsetsRangesStr:"+ offsetsRangesStr)
  98.     ZkUtils.updatePersistentPath(zkClient, zkPath, offsetsRangesStr)
  99.     logger.info("Done updating offsets in Zookeeper. Took " + stopwatch)
  100.   }
  101.  
  102.   class Stopwatch {
  103.     private val start = System.currentTimeMillis()
  104.     override def toString() = (System.currentTimeMillis() - start) + " ms"
  105.   }
  106.  
  107.  
  108. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement