We have a streams application that consumes messages from a source topic, does some processing and forward the results to a destination topic.
The structure of the messages are controlled by some avro schemas.
When starting consuming messages if the schema is not cached yet the application will try to retrieve it from schema registry. If for whichever reason the schema registry is not available (say a network glitch) then the currently being processed message is lost because the default handler is something called LogAndContinueExceptionHandler
.
o.a.k.s.e.LogAndContinueExceptionHandler : Exception caught during Deserialization, taskId: 1_5, topic: my.topic.v1, partition: 5, offset: 142768
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 62
Caused by: java.net.SocketTimeoutException: connect timed out
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method) ~[na:na]
...
o.a.k.s.p.internals.RecordDeserializer : stream-thread [my-app-StreamThread-3] task [1_5] Skipping record due to deserialization error. topic=[my.topic.v1] partition=[5] offset=[142768]
...
So my question is what would be the proper way of dealing with situations like described above and make sure you don't lose messages no matter what. Is there an out of the box LogAndRollbackExceptionHandler
error handler or a way of implementing your own?
Thank you in advance for your inputs.