SHARE
TWEET

Untitled

a guest Jan 18th, 2019 46 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. // Import dependencies and create kafka params as in Create Direct Stream
  2.  
  3.     val offsetRanges = Array(
  4.       // topic, partition, inclusive starting offset, exclusive ending offset
  5.       OffsetRange("test", 0, 0, 100),
  6.       OffsetRange("test", 1, 0, 100)
  7.     )
  8.  
  9.     val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
  10.    
  11. pyspark.streaming.kafka.OffsetRange(topic, partition, fromOffset, untilOffset)
  12.    
  13. fromOffset = 0
  14. untilOffset = 10
  15. partition = 0
  16. topic = 'topic'
  17. offset = OffsetRange(topic, partition, fromOffset, untilOffset)
  18. offsets = [offset]
  19.    
  20. kafkaRDD = KafkaUtils.createRDD(sc, kafkaParams, offsets)
  21.    
  22. topicpartion = TopicAndPartition(var_topic_src_name, var_partition)
  23. fromoffset = {topicpartion: var_offset}
  24. print(fromoffset)
  25.  
  26. kvs = KafkaUtils.createDirectStream(ssc,
  27.                                    [var_topic_src_name],
  28.                                    var_kafka_parms_src,
  29.                                    valueDecoder=serializer.decode_message,
  30.                                    fromOffsets = fromoffset)
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top