Advertisement
yiorgos

confluent-kafka-consumer

Mar 26th, 2021
809
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 0.83 KB | None | 0 0
  1. # pip install confluent-kafka
  2.  
  3. from confluent_kafka import Consumer, KafkaError
  4.  
  5. topic = 'CLIENT_NAME-tests'
  6.  
  7. c = Consumer({
  8.   'bootstrap.servers': 'server.name:9094',
  9.   'group.id': 'mygroup',
  10.  
  11.   # set to False to make sure you do not update offsets
  12.   'enable.auto.commit': False,
  13.   #'enable.auto.commit': True,
  14.  
  15.   # set to earliest to start --from-beginning
  16.   'auto.offset.reset': 'earliest',
  17.  
  18.   'security.protocol': 'SASL_SSL',
  19.   'ssl.ca.location': 'ca-cert',
  20.   'sasl.mechanisms': 'PLAIN',
  21.   'sasl.username': 'CLIENT_USERNAME',
  22.   'sasl.password': 'XXXX',
  23.   })
  24.  
  25. c.subscribe([topic])
  26.  
  27. n = 0
  28. while True:
  29.   msg = c.poll(0)
  30.  
  31.   if msg is None:
  32.     continue
  33.  
  34.   if msg.error():
  35.     print("Consumer error: {}".format(msg.error))
  36.     continue
  37.  
  38.   n += 1
  39.   print("message: {} | {}".format(n, msg.value()))
  40.  
  41. c.close()
  42.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement