Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // bug : restore from checkpoint report : java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to KafkaSink
- /*
- // build.sbt
- name := "hello" // 项目名称
- version := "0.0.1-SNAPSHOT" // 版本号
- scalaVersion := "2.11.8"
- libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.1"
- libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.4.1"
- */
- import java.util.Properties
- import org.apache.spark._
- import org.apache.spark.streaming._
- import org.apache.spark.streaming.kafka010._
- import org.apache.kafka.clients.consumer.ConsumerConfig
- import org.apache.kafka.common.serialization.{StringDeserializer,StringSerializer}
- import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
- class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {
- lazy val producer = createProducer()
- def send(topic: String, value: String): Unit = producer.send(new ProducerRecord(topic, value))
- }
- object KafkaSink {
- def apply(config: Properties): KafkaSink = {
- val f = () => {
- val producer = new KafkaProducer[String, String](config)
- sys.addShutdownHook {
- producer.close()
- }
- producer
- }
- new KafkaSink(f)
- }
- }
- object SimpleApp {
- def main(args: Array[String]) {
- val checkpointDir = if(args.length >= 1) "hdfs://10.2.35.117:9000/spark-cp" else "./spark-cp"
- def functionToCreateContext(): StreamingContext = {
- val conf = new SparkConf().setMaster(if(args.length >= 1) args(0) else "local[2]").setAppName("NetworkWordCount")
- val ssc = new StreamingContext(conf, Seconds(2)) // new context
- ssc.checkpoint(checkpointDir)
- val brokers = "localhost:9092"
- val groupId = "gtest"
- val topics = "test31"
- // Create direct kafka stream with brokers and topics
- val topicsSet = topics.split(",").toSet
- val kafkaParams = Map[String, Object](
- ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
- ConsumerConfig.GROUP_ID_CONFIG -> groupId,
- ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
- ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
- val messages = KafkaUtils.createDirectStream[String, String](
- ssc,
- LocationStrategies.PreferConsistent,
- ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
- val lines = messages.map(_.value)
- val words = lines.flatMap(_.split(" "))
- val pairs = words.map((_, 1))
- val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
- val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
- val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
- val output = (word, sum)
- state.update(sum)
- output
- }
- val stateCounter = pairs.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD))
- val serializer = "org.apache.kafka.common.serialization.StringSerializer"
- val props = new Properties()
- props.put("bootstrap.servers", "localhost:9092")
- props.put("key.serializer", serializer)
- props.put("value.serializer", serializer)
- val kafkaSink = ssc.sparkContext.broadcast(KafkaSink(props))
- stateCounter.foreachRDD { rdd =>
- rdd.foreach { message =>
- val (word, num) = message
- // when restore, below line report error "SerializableConfiguration cannot be cast to KafkaSink"
- kafkaSink.value.send("test31-out", f"$word%s->$num%d")
- }
- }
- ssc
- }
- val ssc = StreamingContext.getOrCreate(checkpointDir, functionToCreateContext _)
- val sc = ssc.sparkContext
- sc.setLogLevel("WARN")
- ssc.start()
- ssc.awaitTermination()
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement