1

I have a kafka stream app that utilize a KTable. My app has ran a while so the KTable is already built up.

How do I truncate the KTable (assuming my app can handle rebuilding the table)?

Is stopping my app and also delete data from the changelog topic the correct way? It seems a little complicate I feel there should be a simpler way.

Thank you

Suanmeiguo
  • 1,365
  • 3
  • 17
  • 28
  • What do you mean by "truncate" -- a `KTable` is a collection/set of key-value-pairs. What do you try to achieve? Please add more details to your question. – Matthias J. Sax Oct 09 '18 at 02:34
  • @MatthiasJ.Sax Basically I want to delete all existing key-value-pairs in that collection. But don't reset the kafka topic offset. So when the ktable come back it won't regenerate all existing values. But for new values coming to the changelog topic I still want to use them. – Suanmeiguo Oct 09 '18 at 17:14
  • I see. So you read the KTable directly from a topic (ie, `builder.table()`) or is it a result from an aggregation? Also, how do you decide, when you want to wipe out the data? – Matthias J. Sax Oct 09 '18 at 21:32
  • It is from an aggregation. But does it change how we do it (vs if it's a `builder.table()`). This is not a frequent operation. It's only for our dev and testing, or maybe if prod got screwed we want to clean it up one time. Thank you for your help. – Suanmeiguo Oct 09 '18 at 22:24
  • An example is: we might feed some invalid value to the input before, hence the aggregation generated invalid data in ktable. I want to refresh the ktable with newer correct value, but cannot just simply send all the keys again because some key no longer exists in my input topic. – Suanmeiguo Oct 09 '18 at 22:27
  • Yes, there is a difference between both cases. For `builder.table()` there is not changelog topic, but the source topic is used instead (it's an optimization to avoid data redundancy -- both topics would contain the exact same data). For your example, I don't understand why you need to wipe out the whole store though? – Matthias J. Sax Oct 10 '18 at 00:06
  • I just want to start with a clean state for testing my change. This is at dev time. Or maybe there's a better way to do it other than wipe out the store? – Suanmeiguo Oct 10 '18 at 17:47
  • For development, you can use the reset tool as describe in the docs: https://docs.confluent.io/current/streams/developer-guide/app-reset-tool.html – Matthias J. Sax Oct 10 '18 at 17:55
  • Thank you @MatthiasJ.Sax. To my understand the reset tool will delete the ktable's changelog topic correct? – Suanmeiguo Oct 11 '18 at 00:27
  • Yes, it will delete all changelog and repartition topics. – Matthias J. Sax Oct 11 '18 at 00:31
  • @MatthiasJ.Sax Thank you very much for your help! – Suanmeiguo Oct 11 '18 at 21:13

1 Answers1

3

If you want to re-create the KTable from scratch, you can use application reset tool. That could be one option for you.

Application reset tool will delete all the changelog and repartition topics created for an application ID and hence for new run, you will get the new KTable and state store.

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
Nishu Tayal
  • 20,106
  • 8
  • 49
  • 101