Advertisement
Guest User

Spark Streaming Application

a guest
Mar 26th, 2015
94
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 2.29 KB | None | 0 0
  1. val GROUP: String = "kafkastreaming"
  2. val NUM_THREADS_PER_TOPIC: Int = 1
  3.  
  4. val kafkaParams = Map[String, String](
  5.   "zookeeper.connect" -> ZOOKEEPER_URL,
  6.   "group.id" -> GROUP,
  7.   "zookeeper.connection.timeout.ms" -> "1000",
  8.   "auto.offset.reset" -> "smallest")
  9.  
  10. var conf = new SparkConf(true).setAppName("KafkaSparkIntegration")
  11. conf.set("spark.streaming.blockInterval", "625")
  12. conf.set("spark.streaming.unpersist", "true") // Just to be sure.
  13.  
  14. conf.getOption("spark.streaming.receiver.maxRate") match {
  15.   case Some(value) => println(s"spark.streaming.receiver.maxRate is set to $value")
  16.   case None => println(s"spark.streaming.receiver.maxRate was not set. Setting it to 15000 records/sec")
  17.     conf = conf.set("spark.streaming.receiver.maxRate", "15000")
  18. }
  19.  
  20. conf.getOption("spark.cleaner.ttl") match {
  21.   case Some(value) => println(s"spark.cleaner.ttl is set to $value")
  22.   case None => println(s"spark.cleaner.ttl was not set. Setting it to 5 minutes (300 seconds)")
  23.     conf = conf.set("spark.cleaner.ttl", "300")
  24. }
  25.  
  26. val sparkContext = new SparkContext(conf)
  27. val context = new StreamingContext(sparkContext, Seconds(5))
  28. context.remember(Minutes(1)) // When the processing lasts more than 5 seconds.
  29.  
  30. val nbExecutors = context.sparkContext.getConf.getInt("spark.executor.instances", 1)
  31. val streams = (1 to nbExecutors).map
  32. { _ =>
  33.   KafkaUtils.createStream[String,
  34.     String,
  35.     StringDecoder,
  36.     StringDecoder](
  37.       context,
  38.       kafkaParams,
  39.       Map(TOPIC_NAME -> NUM_THREADS_PER_TOPIC),
  40.       StorageLevel.MEMORY_AND_DISK_SER_2)
  41.   .map(_._2)
  42. }
  43.  
  44. val unifiedStream = context.union(streams).repartition(nbExecutors)
  45.  
  46. var outside = 0L
  47. unifiedStream
  48.   .map(JsonParser.to(classOf[Event], _))
  49.   .foreachRDD(rdd =>
  50.   {
  51.     val acc = sparkContext.accumulator(0L, "Accumulator")
  52.     var partIndex = sparkContext.accumulator(0, "Partition Index")
  53.     rdd.foreachPartition(partitionData =>
  54.     {
  55.       var counter = 0L
  56.       partitionData.foreach(item =>
  57.       {
  58.         counter += 1L
  59.       })
  60.       acc += counter
  61.       partIndex += 1
  62.     })
  63.     outside += acc.value
  64.     rdd.unpersist() // Just to be sure.
  65.     println(outside + " : " + acc.value + " : " + acc.localValue + " : " + partIndex.value + " : " + partIndex.localValue)
  66.   })
  67.  
  68. context.start()
  69. context.awaitTermination()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement