0

I am really new to kafka, and I noticed that after a consumer read the queue message it seems that the queue message is already gone. If this is the normal behavior of topic queues... is there a way to not to automatically remove the topic message after being read?

by the way, this is the commands I run to start my local kafka server:

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
.\bin\windows\kafka-server-start.bat .\config\server.properties

kafka version: kafka_2.12-1.0.0

I use kafka to store all the messages that will be processed (read) by my mail sender service. The problem with this is... it seems that after the topic message (mail message) is being read by my mail server, is gone forever regardless if the mail sending process is successful or not.

I am wondering if this is normal with kafka.
If it is, is there is a way to manually remove the topic message once the mail sending is successful only.

Sorry, if my question is totally noobish.

Bill Shannon
  • 29,579
  • 6
  • 38
  • 40
Borgy Manotoy
  • 1,960
  • 5
  • 27
  • 42
  • You are slightly confused - there is no such thing as message removal after reading in kafka (there is message removal after specific time, but you shouldn't be hitting that). You should be managing your reader offsets properly - in your case, probably committing them only after you finished processing successfully. You can see bit more at https://stackoverflow.com/questions/46546489/how-does-kafka-consumer-auto-commit-work?rq=1 – Artur Biesiadowski Apr 16 '18 at 11:59

2 Answers2

0

The offset for your consumer changes after you read the message.

If I understand your question correctly, you probably need to always read from earliest.

Look into properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

charlb
  • 1,076
  • 9
  • 18
0

Don't have to be sorry.

In kafka a message (or record) is not delete after consuming it.

Your consumer have property named "group.id" and the broker associates an offset to it. This offset is also store in another kafka topic named "_consumer".

If you stop your consumer and start it again (with the same group.id property value), the broker will send you newer records from the last offset your consumer committed before he stops. If you start another consumer with another group.id, there is not existing offset for your group.id and depending on strategy you decide with the following consumer property properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); you will re-consume all the topic.

If it's not the case, maybe the retention of your topic is to short ? You can check it with following command : kafka-topics --zookeeper host_zoo:port_zoo --topic your_topic --describe

Quentin Geff
  • 819
  • 1
  • 6
  • 21