I have Kafka Streams processing in my application:
myStream
.mapValues(customTransformer::transform)
.groupByKey(Serialized.with(new Serdes.StringSerde(), new SomeCustomSerde()))
.windowedBy(TimeWindows.of(10000L).advanceBy(10000L))
.aggregate(CustomCollectorObject::new,
(key, value, aggregate) -> aggregate.collect(value),
Materialized.<String, CustomCollectorObject, WindowStore<Bytes, byte[]>>as("some_store_name")
.withValueSerde(new CustomCollectorSerde()))
.toStream()
.foreach((k, v) -> /* do something very important */);
Expected behavior: incoming messages are grouped by key and within some time interval are aggregated in CustomCollectorObject
. CustomCollectorObject
is just a class with a List
inside. After every 10 seconds in foreach
I'm doing something very important with my aggregated data. What is very important I expect that foreach
is called every 10 seconds!
Actual behavior: I can see that processing in my foreach
is called rarer, approx every 30-35 seconds, it doesn't matter much. What is very important, I receive 3-4 messages at once.
The question is: how can I reach the expected behavior? I need to my data was processed at runtime without any delays.
I've tried to set cache.max.bytes.buffering: 0
but in this case windowing doesn't work at all.