Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- @SpringBootApplication
- @EnableTransactionManagement
- public class KafkaTransactionMysteryApplication {
- public static void main(String[] args) {
- SpringApplication.run(KafkaTransactionMysteryApplication.class, args);
- }
- @Bean
- @Primary
- public JpaTransactionManager transactionManager() {
- return new JpaTransactionManager();
- }
- @Bean
- public ChainedKafkaTransactionManager chainedTxM(JpaTransactionManager jpa, KafkaTransactionManager<?, ?> kafka) {
- return new ChainedKafkaTransactionManager(kafka, jpa);
- }
- @KafkaListener(topics = "trans-topic")
- @Transactional(propagation = Propagation.REQUIRED, transactionManager = "chainedTxM", rollbackFor = Exception.class)
- public void listen(ConsumerRecord record) throws Exception {
- System.out.println(record.value());
- if (true) {
- throw new Exception("Force rollback");
- }
- }
- }
- spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
- spring.kafka.consumer.bootstrap-servers=127.0.0.1:9092
- spring.kafka.producer.transaction-id-prefix=mytrans
- spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
- spring.kafka.consumer.group-id=trans-topic-grp1
- spring.kafka.consumer.properties.isolation.level=read_committed
- spring.kafka.consumer.auto-offset-reset=earliest
- spring.kafka.listener.ack-mode=RECORD
- spring.kafka.consumer.enable-auto-commit=false
Add Comment
Please, Sign In to add comment