Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- try {
- while (true) {
- ConsumerRecords<String, GenericRecord> records = null;
- try {
- records = consumer.poll(10000);
- } catch (SerializationException e) {
- String s = e.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
- String topics = s.split("-")[0];
- int offset = Integer.valueOf(s.split("offset ")[1]);
- int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);
- TopicPartition topicPartition = new TopicPartition(topics, partition);
- //log.info("Skipping " + topic + "-" + partition + " offset " + offset);
- consumer.seek(topicPartition, offset + 1);
- }
- for (ConsumerRecord<String, GenericRecord> record : records) {
- System.out.printf("value = %s \n", record.value());
- }
- }
- } finally {
- consumer.close();
- }
Add Comment
Please, Sign In to add comment