Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- @Configuration
- @RequiredArgsConstructor
- public class KafkaConfigs {
- private final UrlsConfig urlsConfig;
- @Bean
- public Properties propertiesConsumer() {
- final Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, urlsConfig.getBootstrapServer());
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
- props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, urlsConfig.getSchemaRegistry());
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
- props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
- return props;
- }
- @Bean
- public Properties propertiesProducer() {
- Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
- props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
- props.put(ProducerConfig.ACKS_CONFIG, "1");
- return props;
- }
- }
- @Service
- @RequiredArgsConstructor
- @Slf4j
- public class PartnerListener {
- private final String PARTNER_TOPIC = "partner-topic";
- private final PartnerRepository partnerRepository;
- private final KafkaConfigs kafkaConfigs;
- @KafkaListener(topics = {PARTNER_TOPIC}, groupId = "partner-id")
- public void consume(ConsumerRecord<String, PartnerMessage> consumerRecord) {
- ...
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement