1. @Override
  2.     public void nextTuple() {
  3.         ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
  4.         Map<String, List<KafkaStream<Message>>> topicMessageStreams = consumerConnector.createMessageStreams(ImmutableMap.of(topic, 1));
  5.         KafkaStream<Message> stream = topicMessageStreams.get(topic).get(0);
  6.         try {
  7.             int batchSize = 0;
  8.             for (MessageAndMetadata<Message> message: stream) {
  9.                 LOGGER.info(convertToString(message.message().payload(), "UTF-8"));
  10.                 if (batchSize++ == 5) {
  11.                     break;
  12.                 }
  13.             }
  14.             consumerConnector.shutdown();
  15.         } catch (Exception e) {
  16.             LOGGER.error(e);
  17.         }
  18.     }