Advertisement
Kulasangar

Consumer

May 26th, 2016
353
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 1.83 KB | None | 0 0
  1. package com.company;
  2.  
  3. /**
  4.  * Created by kula on 5/26/2016.
  5.  */
  6.  
  7. import kafka.consumer.Consumer;
  8. import kafka.consumer.ConsumerConfig;
  9. import kafka.consumer.ConsumerIterator;
  10. import kafka.consumer.KafkaStream;
  11. import kafka.javaapi.consumer.ConsumerConnector;
  12.  
  13. import java.util.*;
  14.  
  15. public class KafConsumer {
  16.  
  17.     private final ConsumerConnector consumer;
  18.     private final String topic;
  19.     public static int count = 0;
  20.     public static ConsumerIterator<byte[], byte[]> it;
  21.  
  22.     public KafConsumer(String zookeeper, String groupId, String topic) {
  23.         Properties props = new Properties();
  24.         props.put("zookeeper.connect", zookeeper);
  25.         props.put("group.id", groupId);
  26.         props.put("zookeeper.session.timeout.ms", "500");
  27.         props.put("zookeeper.sync.time.ms", "250");
  28.         props.put("auto.commit.interval.ms", "1000");
  29.  
  30.         consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
  31.         this.topic = topic;
  32.     }
  33.  
  34.     public void testConsumer() {
  35.  
  36.  
  37.         Map<String, Integer> topicCount = new HashMap<>();
  38.         topicCount.put(topic, 1);
  39.  
  40.         Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
  41.         List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
  42.         for (final KafkaStream stream : streams) {
  43.             it = stream.iterator();
  44.             while (it.hasNext()) {
  45.                 System.out.println("Message from Single Topic: " + new String(it.next().message()));
  46.             }
  47.         }
  48.  
  49.  
  50.         if (consumer != null) {
  51.             consumer.shutdown();
  52.         }
  53.  
  54.     }
  55.  
  56.     public static void main(String[] args) {
  57.         KafConsumer simpleHLConsumer = new KafConsumer("localhost:2181", "testgroup", "Games");
  58.         simpleHLConsumer.testConsumer();
  59.     }
  60.  
  61. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement