2

Let's say I use the Kafka Streams processor API to write a very basic processor that deduplicates incoming messages. The processor uses a fault-tolerant Store to keep aside the last version received for a given key.

Basically, when the processor receives a message, it does the following:

public void process(String key, Message value) {
    Long lastVersion = store.get(key);
    if (lastVersion == null || value.getVersion() > lastVersion)  { 
        context.forward(key, message);
        store.put(key, message.version);
    }
}

Both context.forward() and store.put() will produce a message, to different topics (respectively the sink topic and the store changelog topic, since fault tolerance is activated). Since message productions are asynchronous and may target different brokers, if the application is stopped right after the process() finishes, the first production may fail to complete (still in batch, not sent yet) while the second production succeeds.

Since the consumer commit did not happen due to the application crash, the incoming message version=1 will be processed by another consumer (rebalance), but the store (built from changelog) actually contains lastVersion=1. Thus the application will ignore this message -> message is lost.

Question : Is there a way to still have at-least-one guarantees (which is the default provided by Kafka Streams), when using the store like in this sample?

For example, by adding a kind of manual flush between context.forward() and store.put()? Or is the only safe option to actually all the way and enable exactly-once semantics?

  • 1
    There is no chance to accomplish what you want without enabling exactly-once. – Matthias J. Sax Feb 23 '19 at 05:01
  • Thanks for your confirmation. What I find surprising is that it means there are a lot of use cases for stateful processing that cannot happen without enabling exactly-once, i.e. Kafka Streams was pretty limited before transaction API and idempotent producers were implemented. – Jean Pasquali Feb 26 '19 at 06:45

0 Answers0