Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public void execute() {
- final KafkaConsumer<K, V> consumer = new KafkaConsumer<K, V>(props);
- consumer.subscribe(new ArrayList<>(this.subscribedTopics));
- // wait to make sure the topics that we subscribe to are indeed present
- this.waitAndAssertTopicsPresent(consumer);
- final ConsumerRecords<K, V> consumerRecords = consumer.poll(CONSUMER_POLL_TIMEOUT);
- .....
- }
- private void waitAndAssertTopicsPresent(final KafkaConsumer kafkaConsumer) {
- if (this.subscribedTopics.isEmpty()) {
- return;
- }
- final int maxAttempts = 3;
- for (final String topic : this.subscribedTopics) {
- int currentAttempt = 0;
- while (currentAttempt < maxAttempts) {
- currentAttempt++;
- logger.debug("Checking if topic " + topic + " is present, attempt " + currentAttempt);
- List<PartitionInfo> partitionsForTopic = null;
- try {
- partitionsForTopic = kafkaConsumer.partitionsFor(topic);
- } catch (TimeoutException tee) {
- // ignore
- if (logger.isTraceEnabled()) {
- logger.trace("Ignoring exception that occurred checking if topic " + topic + " was present", tee);
- }
- }
- if (partitionsForTopic != null) {
- boolean topicFound = false;
- for (final PartitionInfo partitionInfo : partitionsForTopic) {
- if (partitionInfo == null) {
- continue;
- }
- if (topic.equals(partitionInfo.topic())) {
- // found the topic, so we are done checking for the presence of this topic
- logger.debug("Presence of topic " + topic + ", which was needed to consume messages from, confirmed");
- topicFound = true;
- break;
- }
- }
- if (topicFound) {
- break;
- }
- }
- // the topic wasn't found, wait for a specific amount of time, before checking again
- final int numMilliSecondsToWait = 5 * 1000; // 5 seconds
- logger.debug("Topic " + topic + " seems to be absent, waiting for " + numMilliSecondsToWait + " milli seconds before checking again");
- try {
- Thread.sleep(numMilliSecondsToWait);
- } catch (InterruptedException e) {
- logger.info("Thread " + Thread.currentThread() + " was interrupted while waiting to check for the presence of topic " + topic);
- throw new IllegalStateException("Topic " + topic + ", which was needed to consume messages from, seems to be missing");
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement