@Override
public void nextTuple() {
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, List<KafkaStream<Message>>> topicMessageStreams = consumerConnector.createMessageStreams(ImmutableMap.of(topic, 1));
KafkaStream<Message> stream = topicMessageStreams.get(topic).get(0);
try {
int batchSize = 0;
for (MessageAndMetadata<Message> message: stream) {
LOGGER.info(convertToString(message.message().payload(), "UTF-8"));
if (batchSize++ == 5) {
break;
}
}
consumerConnector.shutdown();
} catch (Exception e) {
LOGGER.error(e);
}
}