1

I have a spring boot app with single kafka consumer to get messages from some topic. But sometime errors are occurred while message handling.

I want to continue to receive the following messages as usual and at the same time be able not to lose that message and receive it, for example, the next time the service is restarted with the consumer after fixing it.

Is it possible to do this?

I understand that I need to disable auto-commit and commit successful messages manually, but, in this case, if I don't throw any exception for this exception case and commit each next successful message manually, then I will lose the previous unsuccessful one, right?

Michael Heil
  • 16,250
  • 3
  • 42
  • 77
Kiril Mytsykov
  • 659
  • 2
  • 10
  • 21

3 Answers3

2

If I understand your question correctly, your assumption is that the exception occurs due to a problem in your code and not while reading the message from the topic. In that case no retry or other measures will solve your problem.

What we usually do is to catch the exception and send it to another Kafka topic. Ideally, you will also add some details on why or in which code part the exception occurred. After you have fixed the bug in your application you can consume the messages from that other topic.

I understand that I need to disable auto-commit and commit successful messages manually, but, in this case, if I don't throw any exception for this exception case and commit each next successful message manually, then I will lose the previous unsuccessful one, right?

Yes, your understanding is correct. To be more precise, you will not "loose" the message but as soon as your ConsumerGroup commits a higher offset it will never try to read the lower offset again without any manual modification.

Alternative

If you only expect very rare cases where an exception could be thrown, but you just ignore it, you can always use the consumer.seek() method in pure Kafka

public void seek(TopicPartition partition, long offset)

to start reading from a particular offset out of a topic partition.

Michael Heil
  • 16,250
  • 3
  • 42
  • 77
  • 1
    I mean any system problems as well as our own bugs in code. After receiving a message in consumer I send it to another service and if something wrong while saving this message from side of this another service, we can't lose this message and we have to try to handle this message after our fixes and redeployment. So yes, at this time I think it's good idea to push such message to another topic like "problem-messages", start his consumer on application startup and after handling is finished - pause this consumer until application startup again next time (after next bugfixes) – Kiril Mytsykov Aug 31 '20 at 09:31
  • In the end of end, I decided to push problem messages to another topic with paused consumer and I wan't to consume this messages after restart application with started consumer (with @KafkaListener). But after restart I don't receive messages that have been pushed to this topic earlier (and have not been consumed because consumer was paused at that moment) – Kiril Mytsykov Sep 01 '20 at 17:29
0

Yes you have to manually commit them. You retry a particular message 2-3 times. If it fails after retries then you can move those messages to another topic and consume those messages when you fix whatever is causing it to fail. This will not block your queue and you won't lose and messages too.

Saurabh Nigam
  • 795
  • 6
  • 12
0

I want to continue to receive the following messages as usual and at the same time be able not to lose that message and receive it, for example, the next time the service is restarted with the consumer after fixing it.

Is it possible to do this?

  1. You don't need to do a manual commit, instead, you can choose to implement a mechanism to do a retrial, by publishing the event in another queue and delayed consuming the event. =====> Amazon SQS has delay Queue but unfortunately there is no such thing in kafka and you have to write the implementation by yourself.

    Reference articles:

    Article 1

    Article 2

  2. If you are retrying the message processing, then the order of the messages can change based on your implementation. Please do keep it in mind.

  3. Do remember that kafka does consider a consumer dead in case the message processing time exceeds max.poll.interval. Read this

Sahil Gupta
  • 2,028
  • 15
  • 22