3

my question is about Kafka streams Ktable.groupBy.aggregate. and the resulting aggregated values.

situation

I am trying to aggregate minute events per day.

I have a minute event generator (not shown here) that generates events for a few houses. Sometimes the event value is wrong and the minute event must be republished. Minute events are published in the topic "minutes".

I am doing an aggregation of these events per day and house using kafka Streams groupBy and aggregate.

problem

Normally as there are 1440 minutes in a day, there should never have an aggregation with more than 1440 values. Also there should never have an aggregation with a negative amount of events.

... But it happens anyways and we do not understand what is wrong in our code.

sample code

Here is a sample simplified code to illustrate the problem. The IllegalStateException is thrown sometimes.


        StreamsBuilder builder = new StreamsBuilder();

        KTable<String, MinuteEvent> minuteEvents = builder.table(
                "minutes",
                Consumed.with(Serdes.String(), minuteEventSerdes),
                Materialized.<String, MinuteEvent, KeyValueStore<Bytes, byte[]>>with(Serdes.String(), minuteEventSerdes)
                        .withCachingDisabled());

        // preform daily aggregation
        KStream<String, MinuteAggregate> dayEvents = minuteEvents
                // group by house and day
                .filter((key, minuteEvent) -> minuteEvent != null && StringUtils.isNotBlank(minuteEvent.house))
                .groupBy((key, minuteEvent) -> KeyValue.pair(
                        minuteEvent.house + "##" + minuteEvent.instant.atZone(ZoneId.of("Europe/Paris")).truncatedTo(ChronoUnit.DAYS), minuteEvent),
                        Grouped.<String, MinuteEvent>as("minuteEventsPerHouse")
                                .withKeySerde(Serdes.String())
                                .withValueSerde(minuteEventSerdes))
                .aggregate(
                        MinuteAggregate::new,
                        (String key, MinuteEvent value, MinuteAggregate aggregate) -> aggregate.addLine(key, value),
                        (String key, MinuteEvent value, MinuteAggregate aggregate) -> aggregate.removeLine(key, value),
                        Materialized
                                .<String, MinuteAggregate, KeyValueStore<Bytes, byte[]>>as(BILLLINEMINUTEAGG_STORE)
                                .withKeySerde(Serdes.String())
                                .withValueSerde(minuteAggSerdes)
                                .withLoggingEnabled(new HashMap<>())) // keep this aggregate state forever
                .toStream();

        // check daily aggregation
        dayEvents.filter((key, value) -> {
            if (value.nbValues < 0) {
                throw new IllegalStateException("got an aggregate with a negative number of values " + value.nbValues);
            }
            if (value.nbValues > 1440) {
                throw new IllegalStateException("got an aggregate with too many values " + value.nbValues);
            }
            return true;
        }).to("days", minuteAggSerdes);

and here are the sample class used in this code snippet :

    public class MinuteEvent {
        public final String house;
        public final double sensorValue;
        public final Instant instant;

        public MinuteEvent(String house,double sensorValue, Instant instant) {
            this.house = house;
            this.sensorValue = sensorValue;
            this.instant = instant;
        }
    }

    public class MinuteAggregate {
        public int nbValues = 0;
        public double totalSensorValue = 0.;
        public String house = "";

        public MinuteAggregate addLine(String key, MinuteEvent value) {
            this.nbValues = this.nbValues + 1;
            this.totalSensorValue = this.totalSensorValue + value.sensorValue;
            this.house = value.house;
            return this;
        }

        public MinuteAggregate removeLine(String key, MinuteEvent value) {
            this.nbValues = this.nbValues -1;
            this.totalSensorValue = this.totalSensorValue - value.sensorValue;
            return this;
        }

        public MinuteAggregate() {
        }
    }

If someone could tell us what we are doing wrong here and why we have these unexpected values that would be great.

additional notes

  • we configure our stream job to run with 4 threads properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
  • we are forced to use a Ktable.groupBy().aggregate() because minute values can be republished with different sensorValue for an already published Instant. And daily aggregation modified accordingly. Stream.groupBy().aggregate() does not have an adder AND a substractor.
Antonin
  • 879
  • 2
  • 10
  • 27

1 Answers1

2

I think, it is actually possible that the count become negative temporary.

The reason is, that each update in your first KTable sends two messaged downstream -- the old value to be subtracted in the downstream aggregation and the new value to be added to the downstream aggregation. Both message will be processed independently in the downstream aggregation.

If the current count is zero, and a subtractions is processed before an addition, the count would become negative temporarily.

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137