Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from kafka import KafkaProducer as kp
- import json
- import sys
- import datetime
- config = json.loads(open("kafka-config/conf.json").read())
- producer = kp(bootstrap_servers=[config["kafka"]])
- sample_data = {
- "EVENT_TYPE":"AUDITD",
- "EVENT_DATA":json.dumps({"KILL":"ME"}),
- "@timestamp":str(datetime.datetime.now()),
- }
- data = json.dumps(sample_data);
- future = producer.send(config["producer"],data.encode('utf8'));
- try:
- record_metadata = future.get(timeout=120)
- print (record_metadata.topic)
- print (record_metadata.partition)
- print (record_metadata.offset)
- except Exception as e:
- print(e)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement