6

I am trying to use the LogAndContinueExceptionHandler on deserialization. It works fine when an error occurs by successfully logging it and continue. However, lets say I have a continuous stream of errors on my incoming messages, and I stop and restart the kafka streams application, then I see that the messages which failed and already logged in my last attempt re-appear again (they are getting logged again). It is more problematic if I try to send the messages in error to a DLQ. On a restart, they are sent again to DLQ. As soon as I have a good record coming in, it looks like the offset moves further and not seeing the already logged messages again on another restart. Is there a way to manually commit within the streams application? I tried to use the ProcessorContext#commit(), but that doesn't seem to have any effect.

I reproduced this behavior by running the sample provided here: https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java

I changed the incoming value Serde to Serdes.Integer().getClass().getName() to force a deserialization error on input and reduced the commit interval to just 1 second. Also added the following to the config.

streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);.

Once it fails and when I restart the app, the same records failed before appear on the logs again. For example, I see the following output on the console each time I restart the app. I would expect these to be not tried again as we already skipped them before.

2018-01-27 15:24:37,591 WARN wordcount-lambda-example-client-StreamThread-1 o.a.k.s.p.i.StreamThread:40 - Exception caught during Deserialization, taskId: 0_0, topic: words, partition: 0, offset: 113 org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4 2018-01-27 15:24:37,592 WARN wordcount-lambda-example-client-StreamThread-1 o.a.k.s.p.i.StreamThread:40 - Exception caught during Deserialization, taskId: 0_0, topic: words, partition: 0, offset: 114 org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4

Looks like when deserialization exceptions occur, this flag is never set to be true here: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228. It seems like it only becomes true once processing succeeds. That might be the reason why commit is not happening even after I manually call processorContext#commit().

Appreciate any help on this mater.

Thank you.

sobychacko
  • 5,099
  • 15
  • 26
  • https://stackoverflow.com/questions/43416178/how-to-commit-manually-with-kafka-stream – Matthias J. Sax Jan 27 '18 at 00:49
  • 1
    You can also try to reduce the commit interval (default is 30 seconds) via StreamsConfig. – Matthias J. Sax Jan 27 '18 at 00:49
  • 1
    Thank you for the response. I tried both options and seems like its not committing and I am still seeing the same messages already got skipped are tried again on a restart, but will look further. – sobychacko Jan 27 '18 at 02:10
  • @MatthiasJ.Sax I updated the question with a way to reproduce the behavior I am observing. Could you tell if thats the expected behavior or if I am doing something the wrong way? Thanks a lot! – sobychacko Jan 27 '18 at 20:33
  • 1
    Thanks for the update. That make sense -- if there are no messages at all, we don't commit -- this is a bug... Can you file a Jira ticket for this issue? Atm, the is not much you can do against it -- either you let if run until there is at least one message that can be processed, you you would need to manipulate offsets manually to "step over" all bad messages. – Matthias J. Sax Jan 29 '18 at 23:50
  • @MatthiasJ.Sax I created the Jira issue here: https://issues.apache.org/jira/browse/KAFKA-6502. Thank you! – sobychacko Jan 30 '18 at 12:34
  • Thanks for creating the Jira. Would you mind putting an actual description into the Jira? Just linking has the danger, that nobody understand the Jira in case this question would get deleted... – Matthias J. Sax Jan 30 '18 at 17:48
  • 1
    @MatthiasJ.Sax Updated the Jira issue with the description from the post above. – sobychacko Feb 01 '18 at 02:51

0 Answers0