Advertisement
alliano

spring kafka batch listener

Feb 7th, 2023 (edited)
1,087
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 9.13 KB | Source Code | 0 0
  1. // the class that i use
  2.  
  3. import java.util.HashMap;
  4. import java.util.Map;
  5. import org.apache.kafka.clients.admin.AdminClientConfig;
  6. import org.apache.kafka.clients.consumer.ConsumerConfig;
  7. import org.apache.kafka.clients.producer.ProducerConfig;
  8. import org.apache.kafka.common.serialization.StringSerializer;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. import org.springframework.kafka.annotation.EnableKafka;
  12. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  13. import org.springframework.kafka.config.KafkaListenerContainerFactory;
  14. import org.springframework.kafka.config.TopicBuilder;
  15. import org.springframework.kafka.core.ConsumerFactory;
  16. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  17. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  18. import org.springframework.kafka.core.KafkaAdmin;
  19. import org.springframework.kafka.core.KafkaTemplate;
  20. import org.springframework.kafka.core.ProducerFactory;
  21. import org.springframework.kafka.core.KafkaAdmin.NewTopics;
  22. import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
  23.  
  24.  
  25.  
  26.  
  27.  
  28.  
  29. //bean configuration for kafka consumer
  30. @Bean
  31.     public KafkaListenerContainerFactory
  32.         <ConcurrentMessageListenerContainer<String, String>> kafkaContainerListenerFactory() {
  33.             ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
  34.             factory.setConsumerFactory(consumerFactory());
  35.             return factory;
  36.     }
  37.  
  38.     @Bean
  39.     public ConsumerFactory<String, String> consumerFactory(){
  40.         return new DefaultKafkaConsumerFactory<String, String>(consumerConfig());
  41.     }
  42.  
  43.     @Bean
  44.     public Map<String, Object> consumerConfig() {
  45.         Map<String, Object> configMap = new HashMap<>();
  46.         configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  47.         configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
  48.         configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
  49.         return configMap;
  50.     }
  51.  
  52.     @Bean
  53.     public KafkaListenerContainerFactory<?> batchFactory() {
  54.         ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
  55.         factory.setConsumerFactory(consumerFactory());
  56.         factory.setBatchListener(true);
  57.         return factory;
  58.     }
  59.  
  60.  
  61.  
  62. //when i add this method the error is ocure
  63. @KafkaListener(topics = {"example-for-batch-listener"}, id = "batchListener", containerFactory = "batchFactory")
  64.     public void batchListener(java.util.List<String> messageList){
  65.  
  66.     }
  67.  
  68.  
  69.  
  70.  
  71.  
  72.  
  73.  
  74.  
  75.  
  76.  
  77.  
  78.  
  79.  
  80.  
  81.  
  82. // error log when run the app
  83.  
  84. Error starting ApplicationContext. To display the condition evaluation report re-run your application with 'debug' enabled.
  85. 2023-02-07T14:51:59.473+09:00 ERROR 5715 --- [  restartedMain] o.s.boot.SpringApplication               : Application run failed
  86.  
  87. org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'
  88.         at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-6.0.4.jar:6.0.4]
  89.         at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-6.0.4.jar:6.0.4]
  90.         at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
  91.         at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-6.0.4.jar:6.0.4]
  92.         at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-6.0.4.jar:6.0.4]
  93.         at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:932) ~[spring-context-6.0.4.jar:6.0.4]
  94.         at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:587) ~[spring-context-6.0.4.jar:6.0.4]
  95.         at org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext.refresh(ReactiveWebServerApplicationContext.java:66) ~[spring-boot-3.0.2.jar:3.0.2]
  96.         at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:730) ~[spring-boot-3.0.2.jar:3.0.2]
  97.         at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:432) ~[spring-boot-3.0.2.jar:3.0.2]
  98.         at org.springframework.boot.SpringApplication.run(SpringApplication.java:308) ~[spring-boot-3.0.2.jar:3.0.2]
  99.         at org.springframework.boot.SpringApplication.run(SpringApplication.java:1302) ~[spring-boot-3.0.2.jar:3.0.2]
  100.         at org.springframework.boot.SpringApplication.run(SpringApplication.java:1291) ~[spring-boot-3.0.2.jar:3.0.2]
  101.         at com.kafka.springkafka.SpringKafkaApplication.main(SpringKafkaApplication.java:10) ~[classes/:na]
  102.         at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104) ~[na:na]
  103.         at java.base/java.lang.reflect.Method.invoke(Method.java:577) ~[na:na]
  104.         at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) ~[spring-boot-devtools-3.0.2.jar:3.0.2]
  105. Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
  106.         at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:830) ~[kafka-clients-3.3.2.jar:na]
  107.         at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666) ~[kafka-clients-3.3.2.jar:na]
  108.         at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:483) ~[spring-kafka-3.0.2.jar:3.0.2]
  109.         at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:451) ~[spring-kafka-3.0.2.jar:3.0.2]
  110.         at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:427) ~[spring-kafka-3.0.2.jar:3.0.2]
  111.         at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:394) ~[spring-kafka-3.0.2.jar:3.0.2]
  112.         at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:371) ~[spring-kafka-3.0.2.jar:3.0.2]
  113.         at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:849) ~[spring-kafka-3.0.2.jar:3.0.2]
  114.         at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:380) ~[spring-kafka-3.0.2.jar:3.0.2]
  115.         at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:531) ~[spring-kafka-3.0.2.jar:3.0.2]
  116.         at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:226) ~[spring-kafka-3.0.2.jar:3.0.2]
  117.         at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:531) ~[spring-kafka-3.0.2.jar:3.0.2]
  118.         at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:383) ~[spring-kafka-3.0.2.jar:3.0.2]
  119.         at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:328) ~[spring-kafka-3.0.2.jar:3.0.2]
  120.         at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-6.0.4.jar:6.0.4]
  121.         ... 16 common frames omitted
  122. Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.serialization.Deserializer
  123.         at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:405) ~[kafka-clients-3.3.2.jar:na]
  124.         at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:436) ~[kafka-clients-3.3.2.jar:na]
  125.         at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:421) ~[kafka-clients-3.3.2.jar:na]
  126.         at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:710) ~[kafka-clients-3.3.2.jar:na]
  127.         ... 30 common frames omitted
  128.  
  129. [INFO] ------------------------------------------------------------------------
  130. [INFO] BUILD SUCCESS
  131. [INFO] ------------------------------------------------------------------------
  132. [INFO] Total time:  7.378 s
  133. [INFO] Finished at: 2023-02-07T14:51:59+09:00
  134. [INFO] ------------------------------------------------------------------------
  135. () L ) j ) s ) spring-kafka )
  136.  
  137.  
  138.  
  139.  
  140. // the error that i highlight
  141. Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.serialization.Deserializer
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement