0

We have a streams application that consumes messages from a source topic, does some processing and forward the results to a destination topic.

The structure of the messages are controlled by some avro schemas.

When starting consuming messages if the schema is not cached yet the application will try to retrieve it from schema registry. If for whichever reason the schema registry is not available (say a network glitch) then the currently being processed message is lost because the default handler is something called LogAndContinueExceptionHandler.

o.a.k.s.e.LogAndContinueExceptionHandler : Exception caught during Deserialization, taskId: 1_5, topic: my.topic.v1, partition: 5, offset: 142768
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 62
Caused by: java.net.SocketTimeoutException: connect timed out
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method) ~[na:na]
...
o.a.k.s.p.internals.RecordDeserializer : stream-thread [my-app-StreamThread-3] task [1_5] Skipping record due to deserialization error. topic=[my.topic.v1] partition=[5] offset=[142768]
...

So my question is what would be the proper way of dealing with situations like described above and make sure you don't lose messages no matter what. Is there an out of the box LogAndRollbackExceptionHandler error handler or a way of implementing your own?

Thank you in advance for your inputs.

Rob
  • 14,746
  • 28
  • 47
  • 65
Julian
  • 3,678
  • 7
  • 40
  • 72
  • You'd typically use `LogAndFailExceptionHandler` to fail the entire pipeline and then retry if you want. – BadZen Jun 21 '21 at 06:26
  • I am aware about that but if my understanding is right that will fail the whole stream application. This is a bit extreme for a glitch that usually last just a few milliseconds even less. What I would like instead would be to rollback the message and then process it again and again for as long as the environment issue is not resolved. – Julian Jun 21 '21 at 06:41
  • A milliseconds long network glitch will absolutely not cause anything like that. If you really are having some transient registry failures you want to retry around instead of fixing, and maintain per-message retry granualarity, you'll probably need to implement a custom deserializer and wire it into the container. – BadZen Jun 21 '21 at 07:16
  • I added extract from our logs showing loosing (skipping) messages when avro schema was not reachable – Julian Jun 23 '21 at 00:40

1 Answers1

1

I've not worked a lot on Kafka, but when i did, i remember having issues such as the one you are describing in our system.

Let me tell you how we took care of our scenarios, maybe it would help you out too:

Scenario 1: If your messages are being lost at the publishing side (publisher --> kafka), you can configure Kafka acknowledgement setting according to your need, if you use spring cloud stream with kafka, the property is spring.cloud.stream.kafka.binder.required-acks

Possible values:

  1. At most once (Ack=0)

    1. Publisher does not care if Kafka acknowledges or not.
    2. Send and forget
    3. Data loss is possible
  2. At least once (Ack=1)

    1. If Kafka does not acknowledge, publisher resends message.

    2. Possible duplication.

    3. Acknowledgment is sent before message is copied to replicas.

  3. Exactly once (Ack=all)

    1. If Kafka does not acknowledge, publisher resends message.

    2. However, if a message gets sent more than once to Kafka, there is no duplication.

    3. Internal sequence number, used to decide if message has already been written on topic or not.

    4. Min.insync.replicas property needs to be set to ensure what is the minimum number of replices that need to be synced before kafka acknowledges to the producer.

Scenario 2: If your data is being lost at the consumer side (kafka --> consumer), you can change the Auto Commit feature of Kafka according to your usage. This is the property if you are using Spring cloud stream spring.cloud.stream.kafka.bindings.input.consumer.AutoCommitOffset.

By default, AutoCommitOffset is true in kafka, and every message that is sent to the consumer is "committed" at Kafka's end, meaning it wont be sent again. However if you change AutoCommitOffset to false, you will have the power to poll the message from kafka in your code, and once you are done with your work, explicitly set commit to true to let kafka know that now you are done with the message.

If a message is not committed, kafka will keep resending it until it is.

Hope this helps you out, or atleast points you in the right direction.

Wasif Raza
  • 31
  • 3
  • We experienced the loss on consumer side. However I don't think your solution will work when using Kafka streams which is what our application is using. I think you are talking about using a Kafka sink and source connectors approach. – Julian Jun 21 '21 at 08:46