Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- producer = KafkaProducer(bootstrap_servers='abc.def.com:9092', value_serializer=str.encode, key_serializer=str.encode)
- #push the processed event to Kafka
- def push_back_to_kafka(processed_events):
- list_of_processed_events = processed_events.collect()
- producer.send('output_event', value = str(list_of_processed_events))
- lines.foreachRDD(push_back_to_kafka)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement