Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.company;
- /**
- * Created by kula on 5/26/2016.
- */
- import kafka.consumer.Consumer;
- import kafka.consumer.ConsumerConfig;
- import kafka.consumer.ConsumerIterator;
- import kafka.consumer.KafkaStream;
- import kafka.javaapi.consumer.ConsumerConnector;
- import java.util.*;
- public class KafConsumer {
- private final ConsumerConnector consumer;
- private final String topic;
- public static int count = 0;
- public static ConsumerIterator<byte[], byte[]> it;
- public KafConsumer(String zookeeper, String groupId, String topic) {
- Properties props = new Properties();
- props.put("zookeeper.connect", zookeeper);
- props.put("group.id", groupId);
- props.put("zookeeper.session.timeout.ms", "500");
- props.put("zookeeper.sync.time.ms", "250");
- props.put("auto.commit.interval.ms", "1000");
- consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
- this.topic = topic;
- }
- public void testConsumer() {
- Map<String, Integer> topicCount = new HashMap<>();
- topicCount.put(topic, 1);
- Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
- List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
- for (final KafkaStream stream : streams) {
- it = stream.iterator();
- while (it.hasNext()) {
- System.out.println("Message from Single Topic: " + new String(it.next().message()));
- }
- }
- if (consumer != null) {
- consumer.shutdown();
- }
- }
- public static void main(String[] args) {
- KafConsumer simpleHLConsumer = new KafConsumer("localhost:2181", "testgroup", "Games");
- simpleHLConsumer.testConsumer();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement