4

We are writing a Kafka Streams Topology that aggregates data and displays them in real time. We would like to make the display as robust as possible - ideally log the record and continue for any exception.

According to the documentation, a few tests from us and

Kafka Streams very well supports handling exceptions that occur in the Producer or during Deserialization. The provided LogAndContinueExceptionHandler gives exactly our desired behavior. However our main problem are exceptions occuring during the processing (such as in .mapValues() or .leftJoin()

The ideas we had were basically to validate preconditions

  1. During Deserialization and throw a DeserializationException (and log and continue) if they are not fulfilled.
  2. As checks in the processing functions to return default values if a calculation cannot be performed (/ by zero error, etc.)

However if there is something unforeseen in the data an exception could still bubble up and the topology would shut down.

Kafka Streams provides an UncaughtExceptionHandler but it is called after the thread already died and therefore it cannot be used to prevent a topology shutdown.

Is there some way to write a UncaughtExceptionHandler that skips a record? Or alteratively a mechanism to skip the current record that we could in a try-catch block inside the processing function?

maow
  • 2,712
  • 1
  • 11
  • 25

1 Answers1

3

I think the best solution is to write your processing operations (e.g: Mapper, Filter, etc) in way that you never throw any exception. For doing that, you can used a wrapper object which can be either in Success of in Error (eg: the Either type in scala). After that, you can use the branch() method to get two streams: one for success records and one for the errors.

The below code shows the basic idea :

    public static void main(String[] args) {
        var builder = new StreamsBuilder();
        KStream<Object, Result<Object>> stream = builder.stream("my-topic")
            .map((k, v) -> {
                try {
                    // unsafe operation, i.e that may throw an exception
                    return KeyValue.pair(k, new Success<>(v));
                } catch (Exception e) {
                    return KeyValue.pair(k, new Error<>(e));
                }
            });
        KStream<Object, Result<Object>>[] branch = stream.branch((k, v) -> !v.hasError(), (k, v) -> v.hasError());

        // Handle the success steam
        KStream<Object, Result<Object>> successStream = branch[0];

        // Handle the error steam, e.g:  log errors, write errors to a Dead Letter Queue
        KStream<Object, Result<Object>> errorStream = branch[1];
        
    }

    public interface Result<T> {
        T get() throws Exception;
        Exception exception();
        boolean hasError();
    }

    public static class Success<T> implements Result<T> {

        private final T value;

        public Success(T value) {
            this.value = value;
        }

        @Override
        public T get() throws Exception {
            return value;
        }

        @Override
        public Exception exception() {
            return null;
        }

        @Override
        public boolean hasError() {
            return false;
        }
    }

    public static class Error<T> implements Result<T> {

        private final Exception error;

        public Error(Exception error) {  this.error = error; }

        @Override
        public T get() throws Exception{
            throw error;
        }

        @Override
        public Exception exception() {
            return error;
        }

        @Override
        public boolean hasError() {
            return true;
        }
    }

In addition, for the Deserialization exceptions you have mentionned, the project Azkarra Streams provides some convenient java classes that can help you (eg. SafeSerdes, DeadLetterTopicExceptionHandler): GitHub

fhussonnois
  • 1,607
  • 12
  • 23