I have a pipeline like this:
env.addSource(kafkaConsumer)
.keyBy { value -> value.f0 }
.window(EventTimeSessionWindows.withGap(Time.minutes(2)))
.reduce(::reduceRecord)
.addSink(kafkaProducer)
I want to expire keyed data with a TTL.
Some blog posts point that I need a ValueStateDescriptor
for that.
I made one like this:
val desc = ValueStateDescriptor("val state", MyKey::class.java)
desc.enableTimeToLive(ttlConfig)
But how do I actually apply this descriptor to my pipeline so it will actually do the TTL expiry?