5

So i have been struggeling with this for a couple of days, acctually. I am consuming records from 4 topics. I need to aggregate the records over a TimedWindow. When the time is up, i want to send either an approved message or a not approved message to a sink topic. Is this possible to do with kafka streams?

It seems it sinks every record to the new topic, even though the window is still open, and that's really not what i want.

Here is the simple code:

 builder.stream(getTopicList(), Consumed.with(Serdes.ByteArray(), 
 Serdes.ByteArray()))
.flatMap(new ExceptionSafeKeyValueMapper<String, 
 FooTriggerMessage>("", Serdes.String(),
       fooTriggerSerde))
 .filter((key, value) -> value.getTriggerEventId() != null)
 .groupBy((key, value) -> value.getTriggerEventId().toString(),
       Serialized.with(Serdes.String(), fooTriggerSerde))

.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(30))
.advanceBy(TimeUnit.SECONDS.toMillis(30)))

.aggregate(() -> new BarApprovalMessage(), /* initializer */
       (key, value, aggValue) -> getApproval(key, value, aggValue),/*adder*/
       Materialized
               .<String, BarApprovalMessage, WindowStore<Bytes, byte[]>>as(
                       storeName) /* state store name */
               .withValueSerde(barApprovalSerde))
.toStream().to(appProperties.getBarApprovalEngineOutgoing(), 
Produced.with(windowedSerde, barApprovalSerde));

As of now, every record is being sinked to the outgoingTopic, i only want it to send one message when the window is closed, so to speak.

Is this possible?

SwissArmyKnife
  • 200
  • 1
  • 12
  • Possible duplicate of [How to send final kafka-streams aggregation result of a time windowed KTable?](https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable) – Matthias J. Sax Mar 01 '18 at 02:20

3 Answers3

5

I answering my own question, if anyone else needs an answer. In the transform stage, I used the context to create a scheduler. This scheduler takes three parameters. What interval to punctuate, which time to use(wall clock or stream time) and a supplier(method to be called when time is met). I used wall clock time and started a new scheduler for each unique window key. I add each message in a KeyValue store and return null. Then, In the method that is called every 30 seconds, I check that the window is closed, and iterate over the messages in the keystore, aggregates and use context.forward and context.commit. Viola! 4 messages received in a 30 seconds window, one message produced.

SwissArmyKnife
  • 200
  • 1
  • 12
  • Care to share any example code for this? Would be extremely helpful, I'm working on a similar use case for Session Windows. – glockman Jun 20 '18 at 15:11
  • 1
    I wrote a blog piece about it. It’s in Swedish, but I’ll guess the code parts will make sense anyway. If it doesn’t I can try to post it! https://www.google.se/amp/s/cygni.se/oppna-fonster-med-kafka-streams/amp/ – SwissArmyKnife Jun 20 '18 at 15:15
  • Thanks so much! Most of it made sense, working through slowly as my use case is a bit different (can't schedule a fixed wait as session may not be complete). Where does "schedulers" come from in your example though? I wasn't sure what the type is and didn't see it declared anywhere. – glockman Jun 21 '18 at 09:48
  • Actually same question for sentIdsList, which I assume you pass in for the map of what's already sent. Is that a state store, or does it come from something different? – glockman Jun 21 '18 at 09:55
  • Schedulers holds Cancelables. I declare them in the transformer. Use context.schedule to punctuate at a given intervall. Sentidslist is a global variable. But i dont think you need info about that one :) – SwissArmyKnife Jun 24 '18 at 16:50
3

You can use the Suppress functionality.

From Kafka official guide:

enter image description here https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results

Simon
  • 188
  • 8
0

I faced the issue, but I solve this problem to add grace(0) after the fixed window and using Suppressed API

public void process(KStream<SensorKeyDTO, SensorDataDTO> stream) {

        buildAggregateMetricsBySensor(stream)
                .to(outputTopic, Produced.with(String(), new SensorAggregateMetricsSerde()));

    }

private KStream<String, SensorAggregateMetricsDTO> buildAggregateMetricsBySensor(KStream<SensorKeyDTO, SensorDataDTO> stream) {
        return stream
                .map((key, val) -> new KeyValue<>(val.getId(), val))
                .groupByKey(Grouped.with(String(), new SensorDataSerde()))
                .windowedBy(TimeWindows.of(Duration.ofMinutes(WINDOW_SIZE_IN_MINUTES)).grace(Duration.ofMillis(0)))
                .aggregate(SensorAggregateMetricsDTO::new,
                        (String k, SensorDataDTO v, SensorAggregateMetricsDTO va) -> aggregateData(v, va),
                        buildWindowPersistentStore())
                .suppress(Suppressed.untilWindowCloses(unbounded()))
                .toStream()
                .map((key, value) -> KeyValue.pair(key.key(), value));
    }


    private Materialized<String, SensorAggregateMetricsDTO, WindowStore<Bytes, byte[]>> buildWindowPersistentStore() {
        return Materialized
                .<String, SensorAggregateMetricsDTO, WindowStore<Bytes, byte[]>>as(WINDOW_STORE_NAME)
                .withKeySerde(String())
                .withValueSerde(new SensorAggregateMetricsSerde());
    }

Here you can see the result

enter image description here

Sergio Sánchez Sánchez
  • 1,694
  • 3
  • 28
  • 48
  • 1
    Isn't this going to still work with event-time. What if we want to suppress with wallclock time? – AK4647 Feb 28 '21 at 20:28