Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import threading, logging, time
- import uuid
- from kafka import KafkaConsumer
- class Consumer(threading.Thread):
- daemon = True
- def run(self):
- consumer = KafkaConsumer(
- bootstrap_servers='fid-kfk-001:9092',
- auto_offset_reset='earliest',
- group_id='marcotest001',
- client_id=uuid.uuid4(),
- max_poll_records=10)
- consumer.subscribe(['quiddities-editorial-en-201501'])
- count = 0
- for message in consumer:
- count += 1
- print (count)
- time.sleep(0.1)
- def main():
- threads = [Consumer()]
- for t in threads:
- t.start()
- while True:
- time.sleep(1)
- if __name__ == "__main__":
- logging.basicConfig(format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
- level=logging.INFO)
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement