Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- pos = 12345678
- from kafka import KafkaConsumer
- from kafka import TopicPartition
- import time
- topic = TopicPartition('test',0)
- # Create connection to Kafka broker
- consumer = KafkaConsumer(bootstrap_servers='localhost:9092', group_id='logstash')
- # Assign partition list for the consumer
- consumer.assign([topic])
- # Get position of next reccord
- position = consumer.position(topic)
- # Specify to fetch the last message
- # consumer.seek(topic, position - 1)
- consumer.seek(topic, pos)
- # commit offset -- http://kafka-python.readthedocs.io/en/master/apidoc/kafka.consumer.html
- consumer.commit()
- # Close connection to broker
- consumer.close()
Advertisement
Add Comment
Please, Sign In to add comment