Advertisement
Guest User

Untitled

a guest
Jun 14th, 2021
233
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 1.48 KB | None | 0 0
  1. @Configuration
  2. @EnableKafka
  3. public class KafkaConsumerConfig
  4. {
  5.     @Value(value = "${kafka.bootstrapAddress}")
  6.     private String bootstrapAddress;
  7.  
  8.     @Value(value = "${general.topic.group.id}")
  9.     private String groupId;
  10.  
  11.     @Value(value = "${user.topic.group.id}")
  12.     private String userGroupId;
  13.    
  14.  
  15.  
  16.     @Bean
  17.     public ConsumerFactory<Integer, String>  consumerFactory() {
  18.         Map<String, Object> props = new HashMap<>();
  19.         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
  20.         props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  21.         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
  22.         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,  StringDeserializer.class);
  23.         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,  false);
  24.         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,  "earliest");
  25.         return new DefaultKafkaConsumerFactory<>(props);
  26.     }
  27.    
  28.    
  29.  
  30.     @Bean
  31.     KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
  32.         ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
  33.                                 new ConcurrentKafkaListenerContainerFactory<>();
  34.         factory.setConsumerFactory(consumerFactory());
  35.         factory.setConcurrency(3);
  36.         factory.getContainerProperties().setAckMode(AckMode.MANUAL);
  37.         return factory;
  38.     }
  39.    
  40.  
  41.    
  42. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement