2

I'm building a Kafka Streams application that generates change events by comparing every new calculated object with the last known object.

So for every message on the input topic, I update an object in a state store and every once in a while (using punctuate), I apply a calculation on this object and compare the result with the previous calculation result (coming from another state store).

To make sure this operation is consistent, I do the following after the punctuate triggers:

  1. write a tuple to the state store
  2. compare the two values, create change events and context.forward them. So the events go to the results topic.
  3. swap the tuple by the new_value and write it to the state store

I use this tuple for scenario's where the application crashes or rebalances, so I can always send out the correct set of events before continuing.

Now, I noticed the resulting events are not always consistent, especially if the application frequently rebalances. It looks like in rare cases the Kafka Streams application emits events to the results topic, but the changelog topic is not up to date yet. In other words, I produced something to the results topic, but my changelog topic is not at the same state yet.

So, when I do a stateStore.put() and the method call returns successfully, are there any guarantees when it will be on the changelog topic?

Can I enforce a changelog flush? When I do context.commit(), when will that flush+commit happen?

process flow

Tim Van Laer
  • 2,434
  • 26
  • 30

1 Answers1

2

To get complete consistency, you will need to enable processing.guarantee="exaclty_once" -- otherwise, with a potential error, you might get inconsistent results.

If you want to stay with "at_least_once", you might want to use a single store, and update the store after processing is done (ie, after calling forward()). This minimized the time window to get inconsistencies.

And yes, if you call context.commit(), before input topic offsets are committed, all stores will be flushed to disk, and all pending producer writes will also be flushed.

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • Very interesting. I understand that `context.commit()` only requests a commit. Can you elaborate a bit on when that exactly happens? I currently do a `context.commit()` for every message, but that doesn't seem to effectively commit every message... – Tim Van Laer Sep 12 '18 at 14:57
  • After reading https://stackoverflow.com/questions/50312386/kafka-stream-consumer-commit-frequency?noredirect=1&lq=1 I understand the efficiency impact of committing every message :-) Still, I'm very curious in how and when the internals decide to commit. – Tim Van Laer Sep 12 '18 at 15:15
  • 2
    It am implementation detail. Basically, there is a loop to process consecutive records. We break this loop once in a while to check if user requested a commit and if yes execute it. If you want to know the details, you need to look into the code. Note, that it's an implementation detail and the design is done to run more efficiently. Also, it changes between versions. Current `trunk` implemenation: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L849-L891 – Matthias J. Sax Sep 12 '18 at 17:03
  • Thanks Matthias! Very much appreciated. – Tim Van Laer Sep 13 '18 at 10:02