Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package streamkafka
- import kafka.common.TopicAndPartition
- import kafka.message.MessageAndMetadata
- import kafka.serializer.StringDecoder
- import kafka.utils.{ZKGroupDirs, ZKGroupTopicDirs, ZkUtils}
- import org.I0Itec.zkclient.ZkClient
- import org.apache.spark.SparkConf
- import org.apache.spark.streaming.dstream.InputDStream
- import org.apache.spark.streaming.{Seconds, StreamingContext}
- import org.apache.spark.streaming.kafka._
- object SparkStreamAndKafka {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local[2]").setAppName("test")
- val ssc = new StreamingContext(conf,Seconds(2))
- val groupId = "123"
- val topic = "test01"
- val kafkaParam = Map(
- "metadata.broker.list"->"hadoop1:9092",
- "group.id"->groupId
- )
- val zkList = "hadoop1:2181,hadoop2:2181,hadoop3:2181"
- val zkDir = new ZKGroupTopicDirs(groupId,topic) //设置zk目录用来存储偏移量
- val offsetDir = zkDir.consumerOffsetDir//存储偏移量的目录
- val zkClient = new ZkClient(zkList)//获取zk客户端
- val children = zkClient.countChildren(offsetDir)
- var fromOffset: Map[TopicAndPartition, Long] = Map()
- var dStream:InputDStream[(String,String)] = null;
- val messageHandler = (mmd:MessageAndMetadata[String,String]) =>(mmd.topic,mmd.message())
- if(children > 0){
- for(i <- 0 until children){
- val partitionOffset: String = zkClient.readData[String](s"${offsetDir}/${i}")
- fromOffset += (new TopicAndPartition(topic,i) -> partitionOffset.toLong)
- }
- dStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](
- ssc,kafkaParam,fromOffset,messageHandler)
- }else{
- dStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](
- ssc,kafkaParam,topic.split(",").toSet)
- }
- val ranges: Array[OffsetRange] = Array[OffsetRange]()
- dStream.transform(rdd =>{
- rdd.asInstanceOf[HasOffsetRanges].offsetRanges
- rdd
- }).map(msg => msg._2)
- .foreachRDD(rdd => {
- rdd.foreachPartition(partition => {
- partition.foreach(record => {
- println(record)
- })
- })
- for(o <- ranges){
- val newZkPath = s"${offsetDir}${o.partition}"
- ZkUtils.updatePersistentPath(zkClient,newZkPath,o.fromOffset.toString)
- }
- })
- ssc.start()
- ssc.awaitTermination()
- ssc.stop()
- }
- }
Add Comment
Please, Sign In to add comment