Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- val GROUP: String = "kafkastreaming"
- val NUM_THREADS_PER_TOPIC: Int = 1
- val kafkaParams = Map[String, String](
- "zookeeper.connect" -> ZOOKEEPER_URL,
- "group.id" -> GROUP,
- "zookeeper.connection.timeout.ms" -> "1000",
- "auto.offset.reset" -> "smallest")
- var conf = new SparkConf(true).setAppName("KafkaSparkIntegration")
- conf.set("spark.streaming.blockInterval", "625")
- conf.set("spark.streaming.unpersist", "true") // Just to be sure.
- conf.getOption("spark.streaming.receiver.maxRate") match {
- case Some(value) => println(s"spark.streaming.receiver.maxRate is set to $value")
- case None => println(s"spark.streaming.receiver.maxRate was not set. Setting it to 15000 records/sec")
- conf = conf.set("spark.streaming.receiver.maxRate", "15000")
- }
- conf.getOption("spark.cleaner.ttl") match {
- case Some(value) => println(s"spark.cleaner.ttl is set to $value")
- case None => println(s"spark.cleaner.ttl was not set. Setting it to 5 minutes (300 seconds)")
- conf = conf.set("spark.cleaner.ttl", "300")
- }
- val sparkContext = new SparkContext(conf)
- val context = new StreamingContext(sparkContext, Seconds(5))
- context.remember(Minutes(1)) // When the processing lasts more than 5 seconds.
- val nbExecutors = context.sparkContext.getConf.getInt("spark.executor.instances", 1)
- val streams = (1 to nbExecutors).map
- { _ =>
- KafkaUtils.createStream[String,
- String,
- StringDecoder,
- StringDecoder](
- context,
- kafkaParams,
- Map(TOPIC_NAME -> NUM_THREADS_PER_TOPIC),
- StorageLevel.MEMORY_AND_DISK_SER_2)
- .map(_._2)
- }
- val unifiedStream = context.union(streams).repartition(nbExecutors)
- var outside = 0L
- unifiedStream
- .map(JsonParser.to(classOf[Event], _))
- .foreachRDD(rdd =>
- {
- val acc = sparkContext.accumulator(0L, "Accumulator")
- var partIndex = sparkContext.accumulator(0, "Partition Index")
- rdd.foreachPartition(partitionData =>
- {
- var counter = 0L
- partitionData.foreach(item =>
- {
- counter += 1L
- })
- acc += counter
- partIndex += 1
- })
- outside += acc.value
- rdd.unpersist() // Just to be sure.
- println(outside + " : " + acc.value + " : " + acc.localValue + " : " + partIndex.value + " : " + partIndex.localValue)
- })
- context.start()
- context.awaitTermination()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement