Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- @Override
- public void nextTuple() {
- ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
- Map<String, List<KafkaStream<Message>>> topicMessageStreams = consumerConnector.createMessageStreams(ImmutableMap.of(topic, 1));
- KafkaStream<Message> stream = topicMessageStreams.get(topic).get(0);
- try {
- int batchSize = 0;
- for (MessageAndMetadata<Message> message: stream) {
- LOGGER.info(convertToString(message.message().payload(), "UTF-8"));
- if (batchSize++ == 5) {
- break;
- }
- }
- consumerConnector.shutdown();
- } catch (Exception e) {
- LOGGER.error(e);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement