Advertisement
Guest User

Untitled

a guest
Feb 27th, 2017
83
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.68 KB | None | 0 0
  1. public void execute() {
  2. final KafkaConsumer<K, V> consumer = new KafkaConsumer<K, V>(props);
  3. consumer.subscribe(new ArrayList<>(this.subscribedTopics));
  4. // wait to make sure the topics that we subscribe to are indeed present
  5. this.waitAndAssertTopicsPresent(consumer);
  6. final ConsumerRecords<K, V> consumerRecords = consumer.poll(CONSUMER_POLL_TIMEOUT);
  7. .....
  8. }
  9.  
  10. private void waitAndAssertTopicsPresent(final KafkaConsumer kafkaConsumer) {
  11. if (this.subscribedTopics.isEmpty()) {
  12. return;
  13. }
  14. final int maxAttempts = 3;
  15. for (final String topic : this.subscribedTopics) {
  16. int currentAttempt = 0;
  17. while (currentAttempt < maxAttempts) {
  18. currentAttempt++;
  19. logger.debug("Checking if topic " + topic + " is present, attempt " + currentAttempt);
  20. List<PartitionInfo> partitionsForTopic = null;
  21. try {
  22. partitionsForTopic = kafkaConsumer.partitionsFor(topic);
  23. } catch (TimeoutException tee) {
  24. // ignore
  25. if (logger.isTraceEnabled()) {
  26. logger.trace("Ignoring exception that occurred checking if topic " + topic + " was present", tee);
  27. }
  28. }
  29. if (partitionsForTopic != null) {
  30. boolean topicFound = false;
  31. for (final PartitionInfo partitionInfo : partitionsForTopic) {
  32. if (partitionInfo == null) {
  33. continue;
  34. }
  35. if (topic.equals(partitionInfo.topic())) {
  36. // found the topic, so we are done checking for the presence of this topic
  37. logger.debug("Presence of topic " + topic + ", which was needed to consume messages from, confirmed");
  38. topicFound = true;
  39. break;
  40. }
  41. }
  42. if (topicFound) {
  43. break;
  44. }
  45. }
  46. // the topic wasn't found, wait for a specific amount of time, before checking again
  47. final int numMilliSecondsToWait = 5 * 1000; // 5 seconds
  48. logger.debug("Topic " + topic + " seems to be absent, waiting for " + numMilliSecondsToWait + " milli seconds before checking again");
  49. try {
  50. Thread.sleep(numMilliSecondsToWait);
  51. } catch (InterruptedException e) {
  52. logger.info("Thread " + Thread.currentThread() + " was interrupted while waiting to check for the presence of topic " + topic);
  53. throw new IllegalStateException("Topic " + topic + ", which was needed to consume messages from, seems to be missing");
  54. }
  55. }
  56. }
  57. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement