Advertisement
Guest User

Kafka Consumer Java

a guest
Aug 4th, 2016
236
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 1.36 KB | None | 0 0
  1. public class ConsumerLoop implements Runnable {
  2.   private final KafkaConsumer<String, String> consumer;
  3.   private final List<String> topics;
  4.   private final int id;
  5.  
  6.   public ConsumerLoop(int id,
  7.                       String groupId,
  8.                       List<String> topics) {
  9.     this.id = id;
  10.     this.topics = topics;
  11.     Properties props = new Properties();
  12.     props.put("bootstrap.servers", "localhost:9092");
  13.     props.put(“group.id”, groupId);
  14.     props.put(“key.deserializer”, StringDeserializer.class.getName());
  15.     props.put(“value.deserializer”, StringDeserializer.class.getName());
  16.     this.consumer = new KafkaConsumer<>(props);
  17.   }
  18.  
  19.   @Override
  20.   public void run() {
  21.     try {
  22.       consumer.subscribe(topics);
  23.  
  24.       while (true) {
  25.         ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
  26.         for (ConsumerRecord<String, String> record : records) {
  27.           Map<String, Object> data = new HashMap<>();
  28.           data.put("partition", record.partition());
  29.           data.put("offset", record.offset());
  30.           data.put("value", record.value());
  31.           System.out.println(this.id + ": " + data);
  32.         }
  33.       }
  34.     } catch (WakeupException e) {
  35.       // ignore for shutdown
  36.     } finally {
  37.       consumer.close();
  38.     }
  39.   }
  40.  
  41.   public void shutdown() {
  42.     consumer.wakeup();
  43.   }
  44. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement