Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- @Configuration
- @EnableKafka
- public class KafkaConsumerConfig
- {
- @Value(value = "${kafka.bootstrapAddress}")
- private String bootstrapAddress;
- @Value(value = "${general.topic.group.id}")
- private String groupId;
- @Value(value = "${user.topic.group.id}")
- private String userGroupId;
- @Bean
- public ConsumerFactory<Integer, String> consumerFactory() {
- Map<String, Object> props = new HashMap<>();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
- props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- return new DefaultKafkaConsumerFactory<>(props);
- }
- @Bean
- KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
- ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
- new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(consumerFactory());
- factory.setConcurrency(3);
- factory.getContainerProperties().setAckMode(AckMode.MANUAL);
- return factory;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement