6

I have a Kafka Streams application for which, whenever I restart it, the offsets for the topic it is consuming get reset. Hence, for all partitions, the lags increase and the app needs to reprocess all the data.

UPDATE: The output topic is receiving a burst of events that were already processed after the App gets restarted, is not that the input topic offsets are getting reset as I said in the previous paragraph. However, the internal topic (KTABLE-SUPPRESS-STATE-STORE) offsets are getting reset, see comments below.

I have ensured the lag is 1 for every partition before the restart (this is for the output topic). All consumers that belong to that consumer-group-id (app-id) are active. The restart is immediate, it takes around 30 secs.

The app is using exactly once as processing guarantee.

I have read this answer How does an offset expire for an Apache Kafka consumer group? .

I have tried with auto.offset.reset = latest and auto.offset.reset = earliest.

It seems like the offsets for these topics are not effectively committed, (but I am not sure about this).

I assume that after the restart the app should pick-up from the latest committed offset for that consumer group.

UPDATE: I assume this for the internal topic (KTABLE-SUPPRESS-STATE-STORE)

Does the Kafka Stream API ensure to commit all consumed offset before shutting down? (after calling streams.close())

I would really appreciate any clue about this.

UPDATE:

This is the code the App execute:

final StreamsBuilder builder = new StreamsBuilder();
final KStream<..., ...> events = builder
        .stream(inputTopicNames, Consumed.with(..., ...)
        .withTimestampExtractor(...);

events
    .filter((k, v) -> ...)
    .flatMapValues(v -> ...)
    .flatMapValues(v -> ...)
    .selectKey((k, v) -> v)
    .groupByKey(Grouped.with(..., ...))
    .windowedBy(
        TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))              
            .advanceBy(Duration.ofSeconds(windowSizeInSecs))
            .grace(Duration.ofSeconds(windowSizeGraceInSecs)))
    .reduce((agg, new) -> {
        ...
        return agg;
    })
    .suppress(Suppressed.untilWindowCloses(
                  Suppressed.BufferConfig.unbounded()))
    .toStream()
    .to(outPutTopicNameOfGroupedData, Produced.with(..., ...));

The offset reset just and always happens (after restarting) with the KTABLE-SUPPRESS-STATE-STORE internal topic created by the Kafka Stream API.

I have tried with the Processing guarantee exactly once and at least once.

Once again, I will really appreciate any clue about this.

UPDATE: This has been solved in the release 2.2.1 (https://issues.apache.org/jira/browse/KAFKA-7895)

  • You say you are not sure that offsets are really committed. Have you looked at the consumer__offsets topic to verify this? This blog post might help to find this out: https://medium.com/@felipedutratine/kafka-consumer-offsets-topic-3d5483cda4a6 . – user152468 Jan 11 '19 at 11:14
  • Have you tried if it works without the exactly once processing guarantee? This is a relatively new feature. – user152468 Jan 11 '19 at 11:15
  • Thanks for the comment @user152468 , yes I have check with the `bin/kafka-consumer-groups.sh` tool. I have not tried with other processing guarantees since in my case I need exactly one, but worth to try just to discard that could be the issue. However, I will be surprised if it is. – Jonathan Santilli Jan 11 '19 at 11:57
  • Offsets should be committed on `streams.close()` -- try to verify this. Also, offsets should be committed all 100ms by default if exactly-once is enabled. Not sure why the offsets are not picked up at startup again. I would recommend to inspect the logs -- maybe increase the log level to DEBUG to get more information. – Matthias J. Sax Jan 13 '19 at 04:59
  • Hello @MatthiasJ.Sax thanks for the reply, I have found this in the logs: INFO [MI-APP-ID-xxx-StreamThread-4] internals.StoreChangelogReader (StoreChangelogReader.java:215) - stream-thread [MI-APP-ID-xxx-StreamThread-4] **No checkpoint found** for task 1_5 state store KTABLE-SUPPRESS-STATE-STORE-0000000011 changelog MI-APP-ID-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-5 with EOS turned on. **Reinitializing the task and restore its state from the beginning**. ...follow next comment – Jonathan Santilli Jan 13 '19 at 20:26
  • INFO [MI-APP-ID-XXXStreamThread-4] internals.Fetcher (Fetcher.java:583) - [Consumer clientId=MI-APP-ID-XXX-StreamThread-4-restore-consumer, **groupId=**] Resetting offset for partition MI-APP-ID-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-5 **to offset 0**. Is clear that is resetting the offset for the changelog, but I do not know why, probably is something obvious that I do not know. The fact that **groupId=** is empty is normal?. The log **No checkpoint found** is the checkpoint in the local store right? is that related to the **Resetting offset for partition** ... **to offset 0**? – Jonathan Santilli Jan 13 '19 at 20:30
  • On `KafkaStreams#close()` a local checkpoint file is written. If this checkpoint file is not found on startup, `KafkaStreams` need to wipe out the store, and recreate it from scratch. It uses a second `Consumer` without group-id to perform the store recovery. The question is, why there is no checkpoint file? As you report that your input topic offsets are not committed either, it indicates that your `close()` does not finish cleanly? – Matthias J. Sax Jan 13 '19 at 20:50
  • I do not see any error or Exception after calling `KafkaStreams#close()`: **INFO [Thread-3] streams.KafkaStreams (KafkaStreams.java:902) - stream-client [MY-APP-ID-XXX] Streams client stopped completely**. Do you think I could be facing this bug @MatthiasJ.Sax https://issues.apache.org/jira/browse/KAFKA-7672 – Jonathan Santilli Jan 13 '19 at 21:04
  • Not sure. Hard to say. – Matthias J. Sax Jan 13 '19 at 21:43
  • Hello @MatthiasJ.Sax I have added the code App to the question in case you can guide me or any clue will be appreciated. – Jonathan Santilli Jan 14 '19 at 12:24

3 Answers3

2

The offset reset just and always happens (after restarting) with the KTABLE-SUPPRESS-STATE-STORE internal topic created by the Kafka Stream API.

This is currently (version 2.1) expected behavior, because the suppress() operator works in-memory only. Thus, on restart, the suppress buffer must be recreate from the changelog topic before processing can start.

Note, it is planned to let suppress() write to disk in future releases (cf. https://issues.apache.org/jira/browse/KAFKA-7224). This will avoid the overhead of recreating the buffer from the changelog topic.

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • Thanks @MatthiasJ.Sax, so, is normal to expect that the App will read out of the `KTABLE-SUPPRESS-STATE-STORE` topic from **offset 0** after the restart, hence, will pass downstream to the output topic events that were already processed? – Jonathan Santilli Jan 17 '19 at 08:38
  • Already processed data will not be sent downstream again. The changelog data is just added to the suppress store before processing begins. If the data would not be added to the store, you might have missing results (think, some record was added to the suppress buffer, but never emitted, and no need update for the same key occurs). Also note, that suppress eviction is based on event-time, there is no need to evict anything if the buffer is reloaded on restart (event-time is still the same). – Matthias J. Sax Jan 17 '19 at 18:35
  • Thanks again @MatthiasJ.Sax, the changelog data is added to the suppress store (I guess in memory) before processing begins, reading out of the `KTABLE-SUPPRESS-STATE-STORE` topic always from **offset 0**?. Am asking because the logs always show that the offset is reset to 0 (for `KTABLE-SUPPRESS-STATE-STORE` topic partitions). The offsets for `KTABLE-SUPPRESS-STATE-STORE` topic partitions do not get committed after a successful suppression or a window close? I would expect to add to the buffer again records that were never emitted, or is not like that? Thanks a lot in advance @MatthiasJ.Sax. – Jonathan Santilli Jan 17 '19 at 21:05
  • (1) offsets of changelog topics are never committed (2) because the buffer is in-memory, even if offset would be committed, it would be required to ignored this and reload from offset zero anyway -- otherwise, the buffer would not be loaded correctly (nothing is written to local disk -- it's an in-memory buffer atm). "I would expect to add to the buffer again records that were never emitted, or is not like that?" -- that is possible of course. – Matthias J. Sax Jan 17 '19 at 21:36
  • Oh, much more clear @MatthiasJ.Sax, thanks! if I understood correctly since offsets of changelog **suppress** topic are never committed, there is no way at the moment for the Kafka Stream App to know which records were already emitted (since the App needs to replay from **offset 0**) unless there is some sort of metadata in the records stored in the **suppress** topic, am right?. Once again, thanks a lot Matthias for your help, time and explanation. – Jonathan Santilli Jan 18 '19 at 09:23
  • Correct -- however, offsets don't relate to what records where emitted already. Emitting is based on event-time and each record in the buffer has its event-time stored in the message timestamp field. – Matthias J. Sax Jan 18 '19 at 17:41
  • Thanks @MatthiasJ.Sax, makes sense. 1.- The App reply the whole changelog `suppress` topic to buffer the state store before processing begins. 2.- Once the whole store had been recreated (buffered in memory), the processing of events begins. 3.- Since the emitting process is based on event-time, when new records not yet emitted arrive (I guess compared with the event-time of the last record emitted located in the buffer) the suppress starts again until the windows end. If that's correct, my App is not doing it. After the restart, lots of records already processed ends up into the output topic. – Jonathan Santilli Jan 18 '19 at 23:22
  • That sounds correct. -- About your last statement: you mean you get some output records as duplicates? That would be a bug. – Matthias J. Sax Jan 19 '19 at 02:21
  • Yes @MatthiasJ.Sax, although is not deterministic, after the restart, already processed records arrive into the output topic by the order of magnitude of 10, approx. I have been reading the code, but not easy to spot if there is a bug. If you can guide me what or were to check, I will appreciate it a lot. Thanks! – Jonathan Santilli Jan 19 '19 at 09:45
  • If a record is in the buffer and it get's emitted, it should be deleted from the buffer (cf. `InMemoryTimeOrderedKeyValueBuffer#evictWhile()`) -- the `dirtyKey` set is written to the changelog topic on `flush()` and should delete the emitted records there. Thus, on restart, they should be be emitted again, as they should be reloaded into the buffer. – Matthias J. Sax Jan 19 '19 at 17:47
  • 1
    Thanks @MatthiasJ.Sax, I will check that :) – Jonathan Santilli Jan 21 '19 at 10:49
  • I have exactly the same problem :( – Eduardo May 18 '19 at 02:47
0

I think @Matthias J. Sax 's reply covers most of the internals of suppress. One thing I need to clarify though: when you say "restart the application", what exactly did you do? Did you shutdown the whole application gracefully, and then restart it?

Guozhang Wang
  • 481
  • 2
  • 5
  • Hello @GuozhangWang thanks for the reply. The restart involves **stop** the App calling `KafkaStreams#.close()` within a shutdown hook and then **starting** the App calling the `java ...` command. – Jonathan Santilli Jan 25 '19 at 14:06
  • That sounds reasonable. I think Matthias' last comment is explaining the situation then. – Guozhang Wang Feb 07 '19 at 19:50
  • Hi @GuozhangWang I faced the similar problem(v2.3.1) when restart the app, repartition topic has bytesin but no bytes out while changelog topic has huge bytesout, no bytesin. I checked the log, and noticed that the offset of suppress change is reset to 0. And the status keeps changing between PARTITIONS_REVOKED and PARTITIONS_ASSIGNED. Is this expected? I didn't see status changed back to RUNNING. Please help! Thanks a lot!! – thinktwice Oct 21 '19 at 02:12
  • How large is your state to restart with? From the description it seems the suppression buffer was restored from scratch and takes very long time and hence cause the consumer to be kicked out of the group again. – Guozhang Wang Oct 22 '19 at 16:36
  • Hi @GuozhangWang, traffic is about 150K-180K messages/sec. So I assume it's a big state. Every time when I redeploy the build, it happened. Is there any thing I could do to prevent restore the entire suppression buffer? Thanks a lot! – thinktwice Oct 30 '19 at 20:34
-2

Commit frequency is controlled by the parameter commit.interval.ms. Check whether your offsets are indeed committed. By default, offsets are committed every 100 ms or 30 secs, depending upon your processing guarantee config. Check this out

senseiwu
  • 5,001
  • 5
  • 26
  • 47
  • Thanks for the answer @senseiwu. Yes, the value for `commit.interval.ms`, in this case, is **100ms** (default due exactly once). Am checking the consumer position with bin/kafka-consumer-groups.sh, before restarting, the LAG is 1, after the restart, the LAG goes to the roof. – Jonathan Santilli Jan 11 '19 at 11:49
  • 1
    Check [this](https://stackoverflow.com/questions/50934411/kafka-streams-exactly-once-delivery) post, especially comments by _Matthias J. Sax Jun 21 '18 at 17:24_ which explains offset commit mechanism a bit in detail – senseiwu Jan 11 '19 at 12:19
  • This is not an answer to the question. – Matthias J. Sax Jan 13 '19 at 04:59
  • Thanks for pointing out. But from the question, it sounded like the OP is not sure about how periodic commits based on `commit.interval.ms` actually works. It is still not clear to me whether that is where he has an issue or whether offsets are getting reset like he further speculates – senseiwu Jan 13 '19 at 08:06