Guest User

Untitled

a guest
Sep 19th, 2018
97
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.11 KB | None | 0 0
  1. try {
  2. while (true) {
  3. ConsumerRecords<String, GenericRecord> records = null;
  4. try {
  5. records = consumer.poll(10000);
  6. } catch (SerializationException e) {
  7. String s = e.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
  8. String topics = s.split("-")[0];
  9. int offset = Integer.valueOf(s.split("offset ")[1]);
  10. int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);
  11.  
  12. TopicPartition topicPartition = new TopicPartition(topics, partition);
  13. //log.info("Skipping " + topic + "-" + partition + " offset " + offset);
  14. consumer.seek(topicPartition, offset + 1);
  15. }
  16.  
  17.  
  18. for (ConsumerRecord<String, GenericRecord> record : records) {
  19.  
  20. System.out.printf("value = %s \n", record.value());
  21.  
  22.  
  23. }
  24.  
  25. }
  26.  
  27.  
  28. } finally {
  29. consumer.close();
  30. }
Add Comment
Please, Sign In to add comment