I am trying to use the LogAndContinueExceptionHandler on deserialization. It works fine when an error occurs by successfully logging it and continue. However, lets say I have a continuous stream of errors on my incoming messages, and I stop and restart the kafka streams application, then I see that the messages which failed and already logged in my last attempt re-appear again (they are getting logged again). It is more problematic if I try to send the messages in error to a DLQ. On a restart, they are sent again to DLQ. As soon as I have a good record coming in, it looks like the offset moves further and not seeing the already logged messages again on another restart. Is there a way to manually commit within the streams application? I tried to use the ProcessorContext#commit(), but that doesn't seem to have any effect.
I reproduced this behavior by running the sample provided here: https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java
I changed the incoming value Serde to Serdes.Integer().getClass().getName()
to force a deserialization error on input and reduced the commit interval to just 1 second. Also added the following to the config.
streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
.
Once it fails and when I restart the app, the same records failed before appear on the logs again. For example, I see the following output on the console each time I restart the app. I would expect these to be not tried again as we already skipped them before.
2018-01-27 15:24:37,591 WARN wordcount-lambda-example-client-StreamThread-1 o.a.k.s.p.i.StreamThread:40 - Exception caught during Deserialization, taskId: 0_0, topic: words, partition: 0, offset: 113
org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
2018-01-27 15:24:37,592 WARN wordcount-lambda-example-client-StreamThread-1 o.a.k.s.p.i.StreamThread:40 - Exception caught during Deserialization, taskId: 0_0, topic: words, partition: 0, offset: 114
org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
Looks like when deserialization exceptions occur, this flag is never set to be true here: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228. It seems like it only becomes true once processing succeeds. That might be the reason why commit is not happening even after I manually call processorContext#commit().
Appreciate any help on this mater.
Thank you.