Advertisement
Guest User

Untitled

a guest
Mar 22nd, 2019
70
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.98 KB | None | 0 0
  1. from kafka import KafkaProducer
  2. def publish_log_messages(producer_instance, topic_name, value):
  3. try:
  4. value_serializer = value.encode()
  5. producer_instance.send(topic_name, value=value_serializer)
  6. producer_instance.flush()
  7.  
  8. except Exception as ex:
  9. print('Exception in publishing message')
  10. print(str(ex))
  11. def connect_kafka_producer():
  12. try:
  13. producer = KafkaProducer(bootstrap_servers=['wn01.itversity.com:6667','wn02.itversity.com:6667','wn03.itversity.com:6667'],
  14. api_version=(0, 10))
  15. except Exception as ex:
  16. print('Exception while connecting Kafka')
  17. print(str(ex))
  18. finally:
  19. return producer
  20.  
  21. if __name__ == '__main__':
  22. kafka_producer=connect_kafka_producer()
  23. with open('/opt/gen_logs/logs/access.log') as f:
  24. logMessages = f.read().splitlines()
  25. for message in logMessages:
  26. publish_log_messages(kafka_producer,'retail_multi',message)
  27. kafka_producer.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement