Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- try (Producer<Void, String> producer = createProducer(kafkaContainerBootstrapServers)) {
- producer.initTransactions();
- producer.beginTransaction();
- Arrays.stream(messages).forEach(
- message -> producer.send(new ProducerRecord<>(KAFKA_INPUT_TOPIC, message)));
- producer.commitTransaction();
- }
- private static Producer<Void, String> createProducer(String kafkaContainerBootstrapServers) {
- return new KafkaProducer<>(
- ImmutableMap.of(
- ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainerBootstrapServers,
- ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString(),
- ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true,
- ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString()
- ),
- new VoidSerializer(),
- new StringSerializer());
- }
- private static final String KAFKA_VERSION = "4.1.1";
- @Rule
- public KafkaContainer kafka = new KafkaContainer(KAFKA_VERSION)
- .withEmbeddedZookeeper();
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement