Advertisement
Guest User

Untitled

a guest
Jun 16th, 2019
91
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.65 KB | None | 0 0
  1. // bug : restore from checkpoint report : java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to KafkaSink
  2.  
  3.  
  4. /*
  5. // build.sbt
  6.  
  7. name := "hello" // 项目名称
  8. version := "0.0.1-SNAPSHOT" // 版本号
  9. scalaVersion := "2.11.8"
  10. libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.1"
  11. libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.4.1"
  12. */
  13. import java.util.Properties
  14.  
  15. import org.apache.spark._
  16. import org.apache.spark.streaming._
  17. import org.apache.spark.streaming.kafka010._
  18.  
  19. import org.apache.kafka.clients.consumer.ConsumerConfig
  20. import org.apache.kafka.common.serialization.{StringDeserializer,StringSerializer}
  21. import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
  22.  
  23. class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {
  24.  
  25. lazy val producer = createProducer()
  26.  
  27. def send(topic: String, value: String): Unit = producer.send(new ProducerRecord(topic, value))
  28. }
  29.  
  30. object KafkaSink {
  31. def apply(config: Properties): KafkaSink = {
  32. val f = () => {
  33. val producer = new KafkaProducer[String, String](config)
  34.  
  35. sys.addShutdownHook {
  36. producer.close()
  37. }
  38.  
  39. producer
  40. }
  41. new KafkaSink(f)
  42. }
  43. }
  44.  
  45. object SimpleApp {
  46. def main(args: Array[String]) {
  47. val checkpointDir = if(args.length >= 1) "hdfs://10.2.35.117:9000/spark-cp" else "./spark-cp"
  48.  
  49. def functionToCreateContext(): StreamingContext = {
  50. val conf = new SparkConf().setMaster(if(args.length >= 1) args(0) else "local[2]").setAppName("NetworkWordCount")
  51.  
  52. val ssc = new StreamingContext(conf, Seconds(2)) // new context
  53. ssc.checkpoint(checkpointDir)
  54.  
  55. val brokers = "localhost:9092"
  56. val groupId = "gtest"
  57. val topics = "test31"
  58. // Create direct kafka stream with brokers and topics
  59. val topicsSet = topics.split(",").toSet
  60.  
  61. val kafkaParams = Map[String, Object](
  62. ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
  63. ConsumerConfig.GROUP_ID_CONFIG -> groupId,
  64. ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  65. ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
  66.  
  67. val messages = KafkaUtils.createDirectStream[String, String](
  68. ssc,
  69. LocationStrategies.PreferConsistent,
  70. ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
  71. val lines = messages.map(_.value)
  72. val words = lines.flatMap(_.split(" "))
  73. val pairs = words.map((_, 1))
  74.  
  75. val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
  76.  
  77. val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
  78. val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
  79. val output = (word, sum)
  80. state.update(sum)
  81. output
  82. }
  83. val stateCounter = pairs.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD))
  84.  
  85. val serializer = "org.apache.kafka.common.serialization.StringSerializer"
  86. val props = new Properties()
  87. props.put("bootstrap.servers", "localhost:9092")
  88. props.put("key.serializer", serializer)
  89. props.put("value.serializer", serializer)
  90. val kafkaSink = ssc.sparkContext.broadcast(KafkaSink(props))
  91.  
  92. stateCounter.foreachRDD { rdd =>
  93. rdd.foreach { message =>
  94. val (word, num) = message
  95. // when restore, below line report error "SerializableConfiguration cannot be cast to KafkaSink"
  96. kafkaSink.value.send("test31-out", f"$word%s->$num%d")
  97. }
  98. }
  99. ssc
  100. }
  101. val ssc = StreamingContext.getOrCreate(checkpointDir, functionToCreateContext _)
  102. val sc = ssc.sparkContext
  103. sc.setLogLevel("WARN")
  104.  
  105. ssc.start()
  106. ssc.awaitTermination()
  107. }
  108. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement