I have a topic named SOURCE which contains a stream of two kinds of messages, A and B. I have written a kafka-streams application that consumes that topic, finds an A and a B with the same correlation ID and aggregates them into a new message C and puts that on the output topic DESTINATION
Sometimes an A without a B (or wise versa) will put on the SOURCE topic. I have created a queryable state store so I can look at these dangling messages but now I'd like to delete a specific message from the intermediate topic. I guessing it's just a matter of getting a message with the right key (which I have) and null ass the body into the intermediate topic. The question is what is the best way?
- Produce a special clear-message to SOURCE which would cause the aggregated message to become null
- Write a message directly to the intermediate topic with null data
- some other way, maybe kafka-streams already have an API call for this?
Bonus question: If I know that I don't want messages to sit in the intermediate topic for longer than 6months, can I instruct kafka-streams to create the intermediate topic with 6m retention or should I create the topic myself manually before I run the app?