3

I have a pipeline like this:

env.addSource(kafkaConsumer, name_source)
            .keyBy { value -> value.f0 }
            .window(EventTimeSessionWindows.withGap(Time.seconds(2)))
            .process(MyProcessor())
            .addSink(kafkaProducer)
            

The keys are guaranteed to be unique in the data that is being currently processed. Thus I would expect the state size to not grow over 2 seconds of data.

However, I notice the state size has been steadily growing over the last day (since the app was deployed).

enter image description here

Is this a bug in flink?

using flink 1.11.2 in aws kinesis data analytics.

pdeva
  • 43,605
  • 46
  • 133
  • 171
  • Is it possible for you to have multiple events on the same key for a continuous amount of time? If a single key continues to observe events, the session would not be closed, and events on that key would continue to be buffered in state... – Pablo May 07 '21 at 04:32
  • no. we are specifically testing this with unique keys to ensure state doesnt grow (but it does...) – pdeva May 07 '21 at 04:33

1 Answers1

2

Kinesis Data Analytics always uses RocksDB as its state backend. With RocksDB, dead state isn't immediately cleaned up, it's merely marked with a tombstone and is later compacted away. I'm not sure how KDA configures RocksDB compaction, but typically it's done when a level reaches a certain size -- and I suspect your state size is still small enough that compaction hasn't occurred.

With incremental checkpoints (which is what KDA does), checkpointing is done by copying RocksDB's SST files -- which in your case are presumably full of stale data. If you let this run long enough you should eventually see a significant drop in checkpoint size, once compaction has been done.

David Anderson
  • 39,434
  • 4
  • 33
  • 60
  • I don't see that happening on Kinesis AWS, the checkpoints have become > 700 MB and they keep on growing. I don't know what "long enough" means, our pipeline is running continuously for 21 days – sakisk Nov 24 '22 at 10:47
  • @sakisk did you ever resolve this issue? I am facing something similar with a stateful Flink app running on AWS Kinesis Data Analtyics – r_g_s_ Apr 19 '23 at 20:15
  • Not entirely, but there are a few things that helped: a) switched to the latest Kinesis supported Flink version (1.15.2), b) increased the checkpoint interval to 5 minutes, c) [cleaned up explicit state](https://stackoverflow.com/q/76043019/581351). Still not at the level I want, but I think that at this point it's because of `RichAsyncFunction` using external resources (e.g. a relational database) – sakisk Apr 21 '23 at 06:51