16

We have Kafka setup to be able to process messages in parallel by several servers. But every message must only be processed exactly once (and by only one server). We have this up and running and it’s working fine.

Now, the problem for us is that the Kafka Consumers reads messages in batches for maximal efficiency. This leads to a problem if/when processing fails, the server shuts down or whatever, because then we loose data that was about to be processed.

Is there a way to get the Consumer to only read on message at a time to let Kafka keep the unprocessed messages? Something like; Consumer pulls one message -> process -> commit offset when done, repeat. Is this feasible using Kafka? Any thoughts/ideas?

Thanks!

Martin at Mavrix
  • 244
  • 2
  • 3
  • 6

3 Answers3

21

You might try setting max.poll.records to 1.

gengel
  • 376
  • 3
  • 9
1

You mention having exactly one processing, but then you're worried about losing data. I'm assuming you're just worried about the edge case when one of your server fails? And you lose data?

I don't think there's a way to accomplish one message at a time. Looking through the consumer configurations, there only seems to be a option for setting the max bytes a consumer can fetch from Kafka, not number of messages.

fetch.message.max.bytes

But if you're worried about losing data completely, if you never commit the offset Kafka will not mark is as being committed and it won't be lost. Reading through the Kafka documentation about delivery semantics,

So effectively Kafka guarantees at-least-once delivery by default and allows the user to implement at most once delivery by disabling retries on the producer and committing its offset prior to processing a batch of messages. Exactly-once delivery requires co-operation with the destination storage system but Kafka provides the offset which makes implementing this straight-forward.

So to achieve exactly-once processing is not something that Kafka enables by default. It requires you to implement storing the offset whenever you write the output of your processing to storage.

But this can be handled more simply and generally by simply letting the consumer store its offset in the same place as its output...As an example of this, our Hadoop ETL that populates data in HDFS stores its offsets in HDFS with the data it reads so that it is guaranteed that either data and offsets are both updated or neither is.

I hope that helps.

Morgan Kenyon
  • 3,072
  • 1
  • 26
  • 38
1

It depends on what client you are going to use. For C++ and python, it is possible to consume ONE message each time.

For python, I used https://github.com/mumrah/kafka-python. The following code can consume one message each time:

message = self.__consumer.get_message(block=False, timeout=self.IterTimeout, get_partition_info=True )

__consumer is the object of SimpleConsumer.

See my question and answer here:How to stop Python Kafka Consumer in program?

For C++, I am using https://github.com/edenhill/librdkafka. The following code can consume one message each time.

214         while( m_bRunning )
215         {
216                 // Start to read messages from the local queue.
217                 RdKafka::Message *msg = m_consumer->consume(m_topic, m_partition, 1000);
218                 msg_consume(msg);
219                 delete msg;
220                 m_consumer->poll(0);
221         }

m_consumer is the pointer to C++ Consumer object (C++ API).

Hope this help.

Community
  • 1
  • 1
BAE
  • 8,550
  • 22
  • 88
  • 171
  • 1
    Do you mean that you consume the messages one-by-one or that its actually pulling one message at a time from Kafka? Because there is a big difference. What we want is to be able to pull one message, commit that offset when done, then pull again, and so on. – Martin at Mavrix Aug 22 '15 at 09:55
  • the consumers do the commit automatically by default. You can set the commit frequency. In python, commit_energy_n is 100 by default. By the way, you need to set group_id. Each message will be consumed by only one consumer in the group. – BAE Aug 22 '15 at 12:58
  • You can do commit by program using api. One message is committed and offset move after consumed. You always need to set start offset. – BAE Aug 22 '15 at 13:06
  • I think what you need is about group and start offset. Apache Kafka doc is helpful. – BAE Aug 22 '15 at 13:11
  • By the way, according to my experience, when network issues happen, eg server down, client consumers can handle these. But python producer may produce one message twice each time server restart. You need extra error handling logic. Hope this helpful – BAE Aug 22 '15 at 13:37