We are writing a Kafka Streams Topology that aggregates data and displays them in real time. We would like to make the display as robust as possible - ideally log the record and continue for any exception.
According to the documentation, a few tests from us and
- Handling exceptions in Kafka streams
- Handling bad messages using Kafka's Streams API
- https://groups.google.com/g/confluent-platform/c/p75CleJ9yU0
Kafka Streams very well supports handling exceptions that occur in the Producer or during Deserialization. The provided LogAndContinueExceptionHandler
gives exactly our desired behavior. However our main problem are exceptions occuring during the processing (such as in .mapValues()
or .leftJoin()
The ideas we had were basically to validate preconditions
- During Deserialization and throw a DeserializationException (and log and continue) if they are not fulfilled.
- As checks in the processing functions to return default values if a calculation cannot be performed (
/ by zero error
, etc.)
However if there is something unforeseen in the data an exception could still bubble up and the topology would shut down.
Kafka Streams provides an UncaughtExceptionHandler
but it is called after the thread already died and therefore it cannot be used to prevent a topology shutdown.
Is there some way to write a UncaughtExceptionHandler that skips a record? Or alteratively a mechanism to skip the current record that we could in a try-catch
block inside the processing function?