Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import java.util.Properties;
- public class ProducerTopic {
- ProducerTopic(){}
- public static void set_in_topic(String topic, String string){
- // create instance for properties to access producer configs
- Properties props = new Properties();
- //Assign localhost id
- props.put("bootstrap.servers", "localhost:9092");
- //Set acknowledgements for producer requests.
- props.put("acks", "all");
- //If the request fails, the producer can automatically retry,
- props.put("retries", 0);
- //Specify buffer size in config
- props.put("batch.size", 16384);
- //Reduce the no of requests less than 0
- props.put("linger.ms", 1);
- //The buffer.memory controls the total amount of memory available to the producer for buffering.
- props.put("buffer.memory", 33554432);
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- org.apache.kafka.clients.producer.Producer<String, String> producer = new KafkaProducer<>(props);
- producer.send(new ProducerRecord<String, String>(topic, string.split("#")[3] , string));
- System.out.println("Message sent successfully to topic " + topic);
- producer.close();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement