Skip to content

Instantly share code, notes, and snippets.

@AnushanTY
Created September 19, 2018 08:07
Show Gist options
  • Select an option

  • Save AnushanTY/6061a520e93c507a9db6314724d85ae8 to your computer and use it in GitHub Desktop.

Select an option

Save AnushanTY/6061a520e93c507a9db6314724d85ae8 to your computer and use it in GitHub Desktop.
try {
while (true) {
ConsumerRecords<String, GenericRecord> records = null;
try {
records = consumer.poll(10000);
} catch (SerializationException e) {
String s = e.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
String topics = s.split("-")[0];
int offset = Integer.valueOf(s.split("offset ")[1]);
int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);
TopicPartition topicPartition = new TopicPartition(topics, partition);
//log.info("Skipping " + topic + "-" + partition + " offset " + offset);
consumer.seek(topicPartition, offset + 1);
}
for (ConsumerRecord<String, GenericRecord> record : records) {
System.out.printf("value = %s \n", record.value());
}
}
} finally {
consumer.close();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment