0

I have Kafka Streams processing in my application:

myStream
    .mapValues(customTransformer::transform)
    .groupByKey(Serialized.with(new Serdes.StringSerde(), new SomeCustomSerde()))
    .windowedBy(TimeWindows.of(10000L).advanceBy(10000L))
    .aggregate(CustomCollectorObject::new,
            (key, value, aggregate) -> aggregate.collect(value),
            Materialized.<String, CustomCollectorObject, WindowStore<Bytes, byte[]>>as("some_store_name")
                    .withValueSerde(new CustomCollectorSerde()))
    .toStream()
    .foreach((k, v) -> /* do something very important */);

Expected behavior: incoming messages are grouped by key and within some time interval are aggregated in CustomCollectorObject. CustomCollectorObject is just a class with a List inside. After every 10 seconds in foreach I'm doing something very important with my aggregated data. What is very important I expect that foreach is called every 10 seconds!

Actual behavior: I can see that processing in my foreach is called rarer, approx every 30-35 seconds, it doesn't matter much. What is very important, I receive 3-4 messages at once.

The question is: how can I reach the expected behavior? I need to my data was processed at runtime without any delays.

I've tried to set cache.max.bytes.buffering: 0 but in this case windowing doesn't work at all.

Marian
  • 387
  • 3
  • 17

1 Answers1

0

Kafka Streams has a different execution model and provides different semantics, ie, your expectation don't match what Kafka Streams does. There are multiple similar questions already:

Also note, that the community is currently working on a new operator called suppress() that will be able to provide the semantics you want: https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables

For now, you would need to add a transform() with a state store, and use punctuations to get the semantics you want (c.f. https://docs.confluent.io/current/streams/developer-guide/processor-api.html#defining-a-stream-processor)

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • Thanks for your response! Looks like `suppress()` is really what I'm looking for. For the time being, I've fixed my situation in a next way: 1. `set commit.interval.ms` with the same value as window duration. 2. added a filter after converting to stream to check if the window is completed. If so - doing something important – Marian Sep 26 '18 at 11:40