Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // the class that i use
- import java.util.HashMap;
- import java.util.Map;
- import org.apache.kafka.clients.admin.AdminClientConfig;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.common.serialization.StringSerializer;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.kafka.annotation.EnableKafka;
- import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
- import org.springframework.kafka.config.KafkaListenerContainerFactory;
- import org.springframework.kafka.config.TopicBuilder;
- import org.springframework.kafka.core.ConsumerFactory;
- import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
- import org.springframework.kafka.core.DefaultKafkaProducerFactory;
- import org.springframework.kafka.core.KafkaAdmin;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.kafka.core.ProducerFactory;
- import org.springframework.kafka.core.KafkaAdmin.NewTopics;
- import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
- //bean configuration for kafka consumer
- @Bean
- public KafkaListenerContainerFactory
- <ConcurrentMessageListenerContainer<String, String>> kafkaContainerListenerFactory() {
- ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
- factory.setConsumerFactory(consumerFactory());
- return factory;
- }
- @Bean
- public ConsumerFactory<String, String> consumerFactory(){
- return new DefaultKafkaConsumerFactory<String, String>(consumerConfig());
- }
- @Bean
- public Map<String, Object> consumerConfig() {
- Map<String, Object> configMap = new HashMap<>();
- configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
- configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
- return configMap;
- }
- @Bean
- public KafkaListenerContainerFactory<?> batchFactory() {
- ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
- factory.setConsumerFactory(consumerFactory());
- factory.setBatchListener(true);
- return factory;
- }
- //when i add this method the error is ocure
- @KafkaListener(topics = {"example-for-batch-listener"}, id = "batchListener", containerFactory = "batchFactory")
- public void batchListener(java.util.List<String> messageList){
- }
- // error log when run the app
- Error starting ApplicationContext. To display the condition evaluation report re-run your application with 'debug' enabled.
- 2023-02-07T14:51:59.473+09:00 ERROR 5715 --- [ restartedMain] o.s.boot.SpringApplication : Application run failed
- org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'
- at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-6.0.4.jar:6.0.4]
- at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-6.0.4.jar:6.0.4]
- at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
- at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-6.0.4.jar:6.0.4]
- at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-6.0.4.jar:6.0.4]
- at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:932) ~[spring-context-6.0.4.jar:6.0.4]
- at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:587) ~[spring-context-6.0.4.jar:6.0.4]
- at org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext.refresh(ReactiveWebServerApplicationContext.java:66) ~[spring-boot-3.0.2.jar:3.0.2]
- at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:730) ~[spring-boot-3.0.2.jar:3.0.2]
- at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:432) ~[spring-boot-3.0.2.jar:3.0.2]
- at org.springframework.boot.SpringApplication.run(SpringApplication.java:308) ~[spring-boot-3.0.2.jar:3.0.2]
- at org.springframework.boot.SpringApplication.run(SpringApplication.java:1302) ~[spring-boot-3.0.2.jar:3.0.2]
- at org.springframework.boot.SpringApplication.run(SpringApplication.java:1291) ~[spring-boot-3.0.2.jar:3.0.2]
- at com.kafka.springkafka.SpringKafkaApplication.main(SpringKafkaApplication.java:10) ~[classes/:na]
- at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104) ~[na:na]
- at java.base/java.lang.reflect.Method.invoke(Method.java:577) ~[na:na]
- at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) ~[spring-boot-devtools-3.0.2.jar:3.0.2]
- Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
- at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:830) ~[kafka-clients-3.3.2.jar:na]
- at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666) ~[kafka-clients-3.3.2.jar:na]
- at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:483) ~[spring-kafka-3.0.2.jar:3.0.2]
- at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:451) ~[spring-kafka-3.0.2.jar:3.0.2]
- at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:427) ~[spring-kafka-3.0.2.jar:3.0.2]
- at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:394) ~[spring-kafka-3.0.2.jar:3.0.2]
- at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:371) ~[spring-kafka-3.0.2.jar:3.0.2]
- at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:849) ~[spring-kafka-3.0.2.jar:3.0.2]
- at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:380) ~[spring-kafka-3.0.2.jar:3.0.2]
- at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:531) ~[spring-kafka-3.0.2.jar:3.0.2]
- at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:226) ~[spring-kafka-3.0.2.jar:3.0.2]
- at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:531) ~[spring-kafka-3.0.2.jar:3.0.2]
- at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:383) ~[spring-kafka-3.0.2.jar:3.0.2]
- at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:328) ~[spring-kafka-3.0.2.jar:3.0.2]
- at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-6.0.4.jar:6.0.4]
- ... 16 common frames omitted
- Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.serialization.Deserializer
- at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:405) ~[kafka-clients-3.3.2.jar:na]
- at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:436) ~[kafka-clients-3.3.2.jar:na]
- at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:421) ~[kafka-clients-3.3.2.jar:na]
- at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:710) ~[kafka-clients-3.3.2.jar:na]
- ... 30 common frames omitted
- [INFO] ------------------------------------------------------------------------
- [INFO] BUILD SUCCESS
- [INFO] ------------------------------------------------------------------------
- [INFO] Total time: 7.378 s
- [INFO] Finished at: 2023-02-07T14:51:59+09:00
- [INFO] ------------------------------------------------------------------------
- ( ⌁ ) L ) j ) s ) spring-kafka )
- // the error that i highlight
- Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.serialization.Deserializer
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement