Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // Import dependencies and create kafka params as in Create Direct Stream
- val offsetRanges = Array(
- // topic, partition, inclusive starting offset, exclusive ending offset
- OffsetRange("test", 0, 0, 100),
- OffsetRange("test", 1, 0, 100)
- )
- val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
- pyspark.streaming.kafka.OffsetRange(topic, partition, fromOffset, untilOffset)
- fromOffset = 0
- untilOffset = 10
- partition = 0
- topic = 'topic'
- offset = OffsetRange(topic, partition, fromOffset, untilOffset)
- offsets = [offset]
- kafkaRDD = KafkaUtils.createRDD(sc, kafkaParams, offsets)
- topicpartion = TopicAndPartition(var_topic_src_name, var_partition)
- fromoffset = {topicpartion: var_offset}
- print(fromoffset)
- kvs = KafkaUtils.createDirectStream(ssc,
- [var_topic_src_name],
- var_kafka_parms_src,
- valueDecoder=serializer.decode_message,
- fromOffsets = fromoffset)
Add Comment
Please, Sign In to add comment