Advertisement
Guest User

Untitled

a guest
Mar 16th, 2020
106
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.75 KB | None | 0 0
  1. @Configuration
  2. @RequiredArgsConstructor
  3. public class KafkaConfigs {
  4.  
  5. private final UrlsConfig urlsConfig;
  6.  
  7. @Bean
  8. public Properties propertiesConsumer() {
  9. final Properties props = new Properties();
  10. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, urlsConfig.getBootstrapServer());
  11. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
  12. props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
  13. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  14. props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, urlsConfig.getSchemaRegistry());
  15. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  16. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
  17. props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
  18. return props;
  19. }
  20.  
  21. @Bean
  22. public Properties propertiesProducer() {
  23. Properties props = new Properties();
  24. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  25. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  26. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
  27. props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
  28. props.put(ProducerConfig.ACKS_CONFIG, "1");
  29. return props;
  30. }
  31. }
  32.  
  33. @Service
  34. @RequiredArgsConstructor
  35. @Slf4j
  36. public class PartnerListener {
  37. private final String PARTNER_TOPIC = "partner-topic";
  38.  
  39. private final PartnerRepository partnerRepository;
  40. private final KafkaConfigs kafkaConfigs;
  41.  
  42. @KafkaListener(topics = {PARTNER_TOPIC}, groupId = "partner-id")
  43. public void consume(ConsumerRecord<String, PartnerMessage> consumerRecord) {
  44. ...
  45. }
  46. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement