SHARE
TWEET

Untitled

a guest Mar 20th, 2019 49 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. try (Producer<Void, String> producer = createProducer(kafkaContainerBootstrapServers)) {
  2.             producer.initTransactions();
  3.             producer.beginTransaction();
  4.             Arrays.stream(messages).forEach(
  5.                 message -> producer.send(new ProducerRecord<>(KAFKA_INPUT_TOPIC, message)));
  6.             producer.commitTransaction();
  7.         }
  8.      
  9. private static Producer<Void, String> createProducer(String kafkaContainerBootstrapServers) {
  10.         return new KafkaProducer<>(
  11.             ImmutableMap.of(
  12.                 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainerBootstrapServers,
  13.                 ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString(),
  14.                 ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true,
  15.                 ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString()
  16.             ),
  17.             new VoidSerializer(),
  18.             new StringSerializer());
  19.     }
  20.      
  21. private static final String KAFKA_VERSION = "4.1.1";
  22.  
  23. @Rule
  24. public KafkaContainer kafka = new KafkaContainer(KAFKA_VERSION)
  25.     .withEmbeddedZookeeper();
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top