Advertisement
Guest User

Untitled

a guest
Jan 19th, 2017
62
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.86 KB | None | 0 0
  1. import threading, logging, time
  2. import uuid
  3.  
  4. from kafka import KafkaConsumer
  5.  
  6.  
  7. class Consumer(threading.Thread):
  8. daemon = True
  9.  
  10. def run(self):
  11. consumer = KafkaConsumer(
  12. bootstrap_servers='fid-kfk-001:9092',
  13. auto_offset_reset='earliest',
  14. group_id='marcotest001',
  15. client_id=uuid.uuid4(),
  16. max_poll_records=10)
  17. consumer.subscribe(['quiddities-editorial-en-201501'])
  18.  
  19. count = 0
  20. for message in consumer:
  21. count += 1
  22. print (count)
  23. time.sleep(0.1)
  24.  
  25.  
  26. def main():
  27. threads = [Consumer()]
  28.  
  29. for t in threads:
  30. t.start()
  31.  
  32. while True:
  33. time.sleep(1)
  34.  
  35.  
  36. if __name__ == "__main__":
  37. logging.basicConfig(format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
  38. level=logging.INFO)
  39. main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement