Guest User

Untitled

a guest
Jan 18th, 2019
82
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.07 KB | None | 0 0
  1. // Import dependencies and create kafka params as in Create Direct Stream
  2.  
  3. val offsetRanges = Array(
  4. // topic, partition, inclusive starting offset, exclusive ending offset
  5. OffsetRange("test", 0, 0, 100),
  6. OffsetRange("test", 1, 0, 100)
  7. )
  8.  
  9. val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
  10.  
  11. pyspark.streaming.kafka.OffsetRange(topic, partition, fromOffset, untilOffset)
  12.  
  13. fromOffset = 0
  14. untilOffset = 10
  15. partition = 0
  16. topic = 'topic'
  17. offset = OffsetRange(topic, partition, fromOffset, untilOffset)
  18. offsets = [offset]
  19.  
  20. kafkaRDD = KafkaUtils.createRDD(sc, kafkaParams, offsets)
  21.  
  22. topicpartion = TopicAndPartition(var_topic_src_name, var_partition)
  23. fromoffset = {topicpartion: var_offset}
  24. print(fromoffset)
  25.  
  26. kvs = KafkaUtils.createDirectStream(ssc,
  27. [var_topic_src_name],
  28. var_kafka_parms_src,
  29. valueDecoder=serializer.decode_message,
  30. fromOffsets = fromoffset)
Add Comment
Please, Sign In to add comment