Advertisement
Guest User

Untitled

a guest
Mar 20th, 2019
67
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.08 KB | None | 0 0
  1. try (Producer<Void, String> producer = createProducer(kafkaContainerBootstrapServers)) {
  2. producer.initTransactions();
  3. producer.beginTransaction();
  4. Arrays.stream(messages).forEach(
  5. message -> producer.send(new ProducerRecord<>(KAFKA_INPUT_TOPIC, message)));
  6. producer.commitTransaction();
  7. }
  8.  
  9. private static Producer<Void, String> createProducer(String kafkaContainerBootstrapServers) {
  10. return new KafkaProducer<>(
  11. ImmutableMap.of(
  12. ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainerBootstrapServers,
  13. ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString(),
  14. ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true,
  15. ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString()
  16. ),
  17. new VoidSerializer(),
  18. new StringSerializer());
  19. }
  20.  
  21. private static final String KAFKA_VERSION = "4.1.1";
  22.  
  23. @Rule
  24. public KafkaContainer kafka = new KafkaContainer(KAFKA_VERSION)
  25. .withEmbeddedZookeeper();
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement