Advertisement
Guest User

Untitled

a guest
Jul 12th, 2012
76
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 0.76 KB | None | 0 0
  1. @Override
  2.     public void nextTuple() {
  3.         ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
  4.         Map<String, List<KafkaStream<Message>>> topicMessageStreams = consumerConnector.createMessageStreams(ImmutableMap.of(topic, 1));
  5.         KafkaStream<Message> stream = topicMessageStreams.get(topic).get(0);
  6.         try {
  7.             int batchSize = 0;
  8.             for (MessageAndMetadata<Message> message: stream) {
  9.                 LOGGER.info(convertToString(message.message().payload(), "UTF-8"));
  10.                 if (batchSize++ == 5) {
  11.                     break;
  12.                 }
  13.             }
  14.             consumerConnector.shutdown();
  15.         } catch (Exception e) {
  16.             LOGGER.error(e);
  17.         }
  18.     }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement