3

The code below is only process the first message arrived and publishes it properly. But no more messages are processed afterwards(I'm using kafka-console-consumer.bat in the terminal to monitor the messages published to total-amount-by-id)

Kafka Streams:

KStream<String, String> totalAmount = builder.stream("data-consumed", Consumed.with(Serdes.String(), Serdes.String()));


totalAmount
  .mapValues(v -> Integer.valueOf(v))
  .groupByKey()
  .windowedBy(TimeWindows.of(Duration.ofMillis(100)))
  .aggregate(
        () -> new Integer(0),
        (key, value, aggregate) -> {
                        System.out.println("value: "+value);
                        System.out.println("aggregate: "+aggregate);
                        return value+aggregate;
                    },,
        Materialized.with(Serdes.String(), Serdes.Integer())
  )
  .toStream()
  .map(((key, aggregate) -> new KeyValue<>(key.key(), aggregate)))
  .to("total-amount-by-id", Produced.with(Serdes.String(), Serdes.Integer()));

Tests:

  • I'm publishing 1 message per 100ms, always with the same key, to the topic "data-consumed"
  • The first four (k,v) published to the topic "data-consumed" were: (1,1),(1,2),(1,4),(1,1)
  • Kafka streams published (1,1) to "total-amount-by-id", but nothing else came after that
  • The System.out.println() in the code above printed only: value: 1 aggregate: 0 value: 2 aggregate: 0

Any guess of the reason behind this problem?

*I was expecting to have the second aggregate equals to 1(aggregate: 1)

jimmy
  • 457
  • 5
  • 20
  • 1
    If you want to aggregate all values, then discard the key (set some single default value). Or, iterate over your table via interactive query and sum the values there – OneCricketeer Aug 24 '21 at 14:46
  • The keys are always the same, so no issues with that, right? Looks like the materialization is not working properly? – jimmy Aug 25 '21 at 03:30
  • 1
    Well, your time window is only 100ms, and you're only sending one event in those windows, so I'm not sure what you're expecting it to accumulate. Have you been able to follow the wordcount examples? By the way, you can use TopologyTestDriver to run your code and control the clock rather than use an actual running broker or cli tools – OneCricketeer Aug 25 '21 at 03:50
  • I agree with your point about the length of the windows, but it should print/process the other messages, right? I've already tested with 10s windows and behaviour is the same. I'll try what you suggested. – jimmy Aug 25 '21 at 11:22

0 Answers0