Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package test;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import java.util.Properties;
- class kafkatest {
- public static void main(String[] args) {
- Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(ProducerConfig.CLIENT_ID_CONFIG, "hello-world-producer");
- props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
- props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test");
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
- KafkaProducer producer = new KafkaProducer(props);
- producer.initTransactions();
- producer.beginTransaction();
- producer.send(new ProducerRecord<>("topic", "hello", "world"));
- producer.flush();
- producer.abortTransaction();
- producer.close();
- }
- }
- --- ~ » kafka-console-consumer --bootstrap-server localhost:9092
- --topic topic
- --from-beginning
- --consumer-property isolation.level=read_committed
- world
Add Comment
Please, Sign In to add comment