Advertisement
Guest User

Untitled

a guest
Aug 21st, 2024
119
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 3.22 KB | None | 0 0
  1. package io.conduktor.demos.kafka;
  2.  
  3. import org.apache.kafka.clients.consumer.ConsumerConfig;
  4. import org.apache.kafka.clients.consumer.ConsumerRecord;
  5. import org.apache.kafka.clients.consumer.ConsumerRecords;
  6. import org.apache.kafka.clients.consumer.KafkaConsumer;
  7. import org.apache.kafka.common.errors.WakeupException;
  8. import org.apache.kafka.common.serialization.StringDeserializer;
  9. import org.slf4j.Logger;
  10. import org.slf4j.LoggerFactory;
  11.  
  12. import java.time.Duration;
  13. import java.util.Arrays;
  14. import java.util.Properties;
  15.  
  16. public class ConsumerDemo {
  17.     private static final Logger log = LoggerFactory.getLogger(ConsumerDemo.class);
  18.  
  19.     public static void main(String[] args) {
  20.         log.info("I am a Kafka Consumer");
  21.  
  22.         String bootstrapServers = "127.0.0.1:9092";
  23.         String groupId = "my-fifth-application";
  24.         String topic = "demo_java";
  25.  
  26.         // create consumer configs
  27.         Properties properties = new Properties();
  28.         properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  29.         properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  30.         properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  31.         properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  32.         properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  33.  
  34.         // create consumer
  35.         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
  36.  
  37.         // get a reference to the current thread
  38.         final Thread mainThread = Thread.currentThread();
  39.  
  40.         // adding the shutdown hook
  41.         Runtime.getRuntime().addShutdownHook(new Thread() {
  42.             public void run() {
  43.                 log.info("Detected a shutdown, let's exit by calling consumer.wakeup()...");
  44.                 consumer.wakeup();
  45.  
  46.                 // join the main thread to allow the execution of the code in the main thread
  47.                 try {
  48.                     mainThread.join();
  49.                 } catch (InterruptedException e) {
  50.                     e.printStackTrace();
  51.                 }
  52.             }
  53.         });
  54.  
  55.         try {
  56.  
  57.             // subscribe consumer to our topic(s)
  58.             consumer.subscribe(Arrays.asList(topic));
  59.  
  60.             // poll for new data
  61.             while (true) {
  62.                 ConsumerRecords<String, String> records =
  63.                         consumer.poll(Duration.ofMillis(100));
  64.  
  65.                 for (ConsumerRecord<String, String> record : records) {
  66.                     log.info("Key: " + record.key() + ", Value: " + record.value());
  67.                     log.info("Partition: " + record.partition() + ", Offset:" + record.offset());
  68.                 }
  69.             }
  70.  
  71.         } catch (WakeupException e) {
  72.             log.info("Wake up exception!");
  73.             // we ignore this as this is an expected exception when closing a consumer
  74.         } catch (Exception e) {
  75.             log.error("Unexpected exception", e);
  76.         } finally {
  77.             consumer.close(); // this will also commit the offsets if need be.
  78.             log.info("The consumer is now gracefully closed.");
  79.         }
  80.  
  81.     }
  82. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement