1

I have a pipeline like this:

 env.addSource(kafkaConsumer)
            .keyBy { value -> value.f0 }
            .window(EventTimeSessionWindows.withGap(Time.minutes(2)))
            .reduce(::reduceRecord)
            .addSink(kafkaProducer)

I want to expire keyed data with a TTL.

Some blog posts point that I need a ValueStateDescriptor for that. I made one like this:

val desc = ValueStateDescriptor("val state", MyKey::class.java)
desc.enableTimeToLive(ttlConfig)

But how do I actually apply this descriptor to my pipeline so it will actually do the TTL expiry?

pdeva
  • 43,605
  • 46
  • 133
  • 171

1 Answers1

1

The pipeline you've described doesn't use any keyed state that would benefit from setting state TTL. The only keyed state in your pipeline is the contents of the session windows, and that state is being purged as soon as possible -- as the sessions close. (Furthermore, since you are using a reduce function, that state consists of just one value per key.)

For the most part, expiring state is only relevant for state you explicitly create, in which case you will have ready access to the state descriptor and can configure it to use State TTL. Flink SQL does create state on your behalf that might not automatically expire, in which case you will need to use Idle State Retention Time to configure it. The CEP library also creates state on your behalf, and in this case you should ensure that your patterns either eventually match or timeout.

David Anderson
  • 39,434
  • 4
  • 33
  • 60
  • what about the key itself. is the key cleared after the window closes? or does it remain forever? if so, what if unique keys are constantly being added to the stream? – pdeva Apr 26 '21 at 12:44
  • The keys aren't stored on their own. When keyed state is cleared, the key/value pair is purged, and nothing remains behind. – David Anderson Apr 26 '21 at 12:49
  • i see. am i correct in following: In the pipeline code above, say I get just 1 event with key `x` and value `y`. Once the session for key `x` closes after 2 minutes, both 'x' and 'y' are removed from the state and the state is empty. – pdeva Apr 26 '21 at 20:43
  • 1
    Sounds like you understand, but to be pedantic: Once a watermark arrives that indicates that there was an interval at least two minutes long during which there were no events for `x`, both `x` and `y` are removed from the state. But the state will hold whatever event was used as evidence for the watermark that closed the session for `x`. – David Anderson Apr 27 '21 at 06:17
  • so i tested this out and it seems this answer is not accurate: https://stackoverflow.com/questions/67429035/flink-apps-checkpoint-size-keeps-growing – pdeva May 07 '21 at 04:35