@Override public void nextTuple() { ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); Map>> topicMessageStreams = consumerConnector.createMessageStreams(ImmutableMap.of(topic, 1)); KafkaStream stream = topicMessageStreams.get(topic).get(0); try { int batchSize = 0; for (MessageAndMetadata message: stream) { LOGGER.info(convertToString(message.message().payload(), "UTF-8")); if (batchSize++ == 5) { break; } } consumerConnector.shutdown(); } catch (Exception e) { LOGGER.error(e); } }