Advertisement
AlbeChero

MyKafkaProducer

Mar 29th, 2020
188
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 1.33 KB | None | 0 0
  1. package com.dynamic.kafka.model;
  2.  
  3. import org.apache.kafka.common.serialization.StringSerializer;
  4. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  5. import org.springframework.util.concurrent.ListenableFuture;
  6. import org.springframework.kafka.support.SendResult;
  7. import org.springframework.kafka.core.KafkaTemplate;
  8.  
  9. import java.util.Map;
  10.  
  11. import static org.apache.kafka.clients.producer.ProducerConfig.*;
  12.  
  13. public class MyKafkaProducer {
  14.  
  15.     private final String brokerAddress;
  16.     private final String topic;
  17.  
  18.     public MyKafkaProducer(String brokerAddress, String topic) {
  19.         this.brokerAddress = brokerAddress;
  20.         this.topic = topic;
  21.     }
  22.  
  23.     public ListenableFuture<SendResult<String, String>> send(String message) {
  24.         return createTemplate(brokerAddress).send(topic, message);
  25.     }
  26.  
  27.     private Map<String, Object> producerConfig(String brokerAddress) {
  28.         return Map.of(
  29.                 BOOTSTRAP_SERVERS_CONFIG, brokerAddress
  30.         );
  31.     }
  32.  
  33.     private KafkaTemplate<String, String> createTemplate(String brokerAddress) {
  34.         return new KafkaTemplate<>(
  35.                 new DefaultKafkaProducerFactory<>(
  36.                         producerConfig(brokerAddress),
  37.                         new StringSerializer(),
  38.                         new StringSerializer()));
  39.     }
  40. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement