Guest User

Untitled

a guest
Jun 24th, 2018
102
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.39 KB | None | 0 0
  1. package streamkafka
  2.  
  3. import kafka.common.TopicAndPartition
  4. import kafka.message.MessageAndMetadata
  5. import kafka.serializer.StringDecoder
  6. import kafka.utils.{ZKGroupDirs, ZKGroupTopicDirs, ZkUtils}
  7. import org.I0Itec.zkclient.ZkClient
  8. import org.apache.spark.SparkConf
  9. import org.apache.spark.streaming.dstream.InputDStream
  10. import org.apache.spark.streaming.{Seconds, StreamingContext}
  11. import org.apache.spark.streaming.kafka._
  12.  
  13. object SparkStreamAndKafka {
  14.  
  15. def main(args: Array[String]): Unit = {
  16. val conf = new SparkConf().setMaster("local[2]").setAppName("test")
  17. val ssc = new StreamingContext(conf,Seconds(2))
  18.  
  19. val groupId = "123"
  20. val topic = "test01"
  21. val kafkaParam = Map(
  22. "metadata.broker.list"->"hadoop1:9092",
  23. "group.id"->groupId
  24. )
  25.  
  26. val zkList = "hadoop1:2181,hadoop2:2181,hadoop3:2181"
  27.  
  28. val zkDir = new ZKGroupTopicDirs(groupId,topic) //设置zk目录用来存储偏移量
  29. val offsetDir = zkDir.consumerOffsetDir//存储偏移量的目录
  30.  
  31. val zkClient = new ZkClient(zkList)//获取zk客户端
  32. val children = zkClient.countChildren(offsetDir)
  33.  
  34. var fromOffset: Map[TopicAndPartition, Long] = Map()
  35. var dStream:InputDStream[(String,String)] = null;
  36. val messageHandler = (mmd:MessageAndMetadata[String,String]) =>(mmd.topic,mmd.message())
  37. if(children > 0){
  38. for(i <- 0 until children){
  39. val partitionOffset: String = zkClient.readData[String](s"${offsetDir}/${i}")
  40. fromOffset += (new TopicAndPartition(topic,i) -> partitionOffset.toLong)
  41. }
  42. dStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](
  43. ssc,kafkaParam,fromOffset,messageHandler)
  44. }else{
  45. dStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](
  46. ssc,kafkaParam,topic.split(",").toSet)
  47. }
  48. val ranges: Array[OffsetRange] = Array[OffsetRange]()
  49. dStream.transform(rdd =>{
  50. rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  51. rdd
  52. }).map(msg => msg._2)
  53. .foreachRDD(rdd => {
  54. rdd.foreachPartition(partition => {
  55. partition.foreach(record => {
  56. println(record)
  57. })
  58. })
  59.  
  60. for(o <- ranges){
  61. val newZkPath = s"${offsetDir}${o.partition}"
  62.  
  63. ZkUtils.updatePersistentPath(zkClient,newZkPath,o.fromOffset.toString)
  64. }
  65.  
  66. })
  67.  
  68. ssc.start()
  69. ssc.awaitTermination()
  70. ssc.stop()
  71.  
  72. }
  73.  
  74. }
Add Comment
Please, Sign In to add comment