We are using kafka KTable for aggregation and below is the kind of data we receive in input Input data - Transaction Detail (transaction Id, status, category, amount,.. )
We group the above based on below Grouping Key - (status, category)
App logic
Grouped Stream. Aggregate(() -> new Instance(), (key, newVal, aggVal) - > addAmount(newVal). (key, oldVal, aggVal) - > removeAmount(oldVal));
Let's say we get below stream of data as (transaction I'd, status, category, amount)
1 - 1, pending, cash, 10 // (pending, cash) - 10 aggregated value
2 - 2, pending, cash, 20 // (pending, cash) - 30
3 - 3, actual, card, 15 //(actual, card) - 15
4 - 1. Pending, card, 9 //(pending, cash) - 30, (pending, card) - 9 - - - This is where we are getting problem
In #4 though the update the on the same transactionId 1, but the grouping key changes (from cash to card), now since the grouping has changed it doesnt call removeAmoutn() method but only addAmount() method is called.
Any idea on how we can be solve this issue where if the grouping changes it should take care of earlier aggregated data as well.
I found similar use case here https://stackoverflow.com/a/42685866/2699756
But not sure what was done yo fix this.