Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from kafka import KafkaProducer
- def publish_log_messages(producer_instance, topic_name, value):
- try:
- value_serializer = value.encode()
- producer_instance.send(topic_name, value=value_serializer)
- producer_instance.flush()
- except Exception as ex:
- print('Exception in publishing message')
- print(str(ex))
- def connect_kafka_producer():
- try:
- producer = KafkaProducer(bootstrap_servers=['wn01.itversity.com:6667','wn02.itversity.com:6667','wn03.itversity.com:6667'],
- api_version=(0, 10))
- except Exception as ex:
- print('Exception while connecting Kafka')
- print(str(ex))
- finally:
- return producer
- if __name__ == '__main__':
- kafka_producer=connect_kafka_producer()
- with open('/opt/gen_logs/logs/access.log') as f:
- logMessages = f.read().splitlines()
- for message in logMessages:
- publish_log_messages(kafka_producer,'retail_multi',message)
- kafka_producer.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement