I have an input stream, I use it to create a ktable. Then I create an output stream with the ktable changelog, using toStream() method. The problem is that the stream created by the toStream() method does not contains all the messages from the input stream that has updated my KTable. Here is my code :
final KTable<String, event> KTable = inputStream.groupByKey().aggregate(() -> null,
aggregateKtableMethod,
storageConf);
KStream<String, event> outputStream = KTable.toStream();
I would like to get one message in the outputStream for each message in inputStream. For most of the messages it is working well, but I am losing some events in a particular case : if I receive 2 messages with the same key in a small interval of time (less than 5 seconds). In this case I only receive the second event in the outputStream.
I think it is because the Ktable updates are made by some batch operations, but I can't find any configuration or documentation related to it. Is it the reason of these missing events and do you know how to change the configuration so that I will not lose any message ?