Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from time import sleep
- from json import dumps
- from kafka import KafkaProducer
- filename="contohfile.txt"
- server="localhost"
- port="9092"
- topic="test1"
- producer = KafkaProducer(bootstrap_servers=['{}:{}'.format(server,port)],
- value_serializer=lambda x:
- dumps(x).encode('utf-8'))
- producer.send(topic, u'BEGIN')
- f = open(filename, "r")
- for row in f:
- producer.send(topic, value=row)
- print(row)
- sleep(1)
- producer.send(topic, u'END')
- sleep(1)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement