4

I am a beginner to Kstreams and I went through the documentation but I cannot seem to grasp the difference between these two, a simple explanation with an example is much appreciated.

2 Answers2

5

Aggregation and joins requires state - an initial accumulator or grouping that's carried through the topology

Filtering, branching, mapping or iterating the stream require no state - one message comes in, zero or one messages come out

Worth pointing out that the groupBy functions are considered stateless

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • Do you know why join operations are stateful? What kind of data should be stored internally? – kolobok Aug 31 '23 at 14:00
  • 1
    Joins in KStreams typically require some type of time-window, so all data within that window is collected into memory, then compared against values in another topic within the same time frame. – OneCricketeer Aug 31 '23 at 21:09
5

What OneCricketer said is correct. I am posting this answer just to explain with now example. In a nutshel, stateful operations are dependent on previous events of the stream whereas stateless operations are not. So, take this example of count events.

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
KTable<String, Count> aggregate = stream
      .peek((key, value) -> log.info("received key: {}, value: {}", key, value))
      .filter((key, value) -> /* filter events with value is ____ */)
      .groupByKey()
      .aggregate(new Initializer<Count>() {
                    @Override
                    public Count apply() {
                        return new Count("", 0);
                    }
                }, new Aggregator<String, String, Count>() {
                    @Override
                    public Count apply(String k, String v, Count aggKeyCount) {
                        Integer currentCount = aggKeyCount.getCount();
                        return new Count(k, currentCount + 1);
                    }
                });
                
aggregate.toStream()
         .map((k,v) -> new KeyValue<>(k, v.getCount()))
         .peek((key, value) -> log.info("emitting key: {}, value: {}", key, value))
         .to(COUNTS_TOPIC, Produced.with(Serdes.String(), Serdes.Integer()));

The operations groupByKey, aggregate, filter, map are transforming the events in some way (the peek does not transform the events). The transformations groupByKey, filter and map are stateless because they potentially modify the current events that they are processing (they do not care about the previous events). The aggregate transformation is counting the number of events, so it is summing them up. Therefore it depends on the previous events. Not that it has an Initializer of Count("", 0) and it aggregates events one by one on Count(k, currentCount + 1) that is based on the previous counter aggKeyCount.

Moreover, the concept of stateless and stateful is not on KStream of Kafka. It is on all Processing engines such as Hadoop MapReduce, Apache Spark, Apache Flink, Apache Storm. And it is also present on any processing pipeline, such as Java Stream (e.g.: map, reduce, flatmap, filter), Akka Streams, project reactor.

Felipe
  • 7,013
  • 8
  • 44
  • 102