I am a beginner to Kstreams and I went through the documentation but I cannot seem to grasp the difference between these two, a simple explanation with an example is much appreciated.
2 Answers
Aggregation and joins requires state - an initial accumulator or grouping that's carried through the topology
Filtering, branching, mapping or iterating the stream require no state - one message comes in, zero or one messages come out
Worth pointing out that the groupBy functions are considered stateless

- 179,855
- 19
- 132
- 245
-
Do you know why join operations are stateful? What kind of data should be stored internally? – kolobok Aug 31 '23 at 14:00
-
1Joins in KStreams typically require some type of time-window, so all data within that window is collected into memory, then compared against values in another topic within the same time frame. – OneCricketeer Aug 31 '23 at 21:09
What OneCricketer said is correct. I am posting this answer just to explain with now example. In a nutshel, stateful
operations are dependent on previous events of the stream whereas stateless
operations are not. So, take this example of count events.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
KTable<String, Count> aggregate = stream
.peek((key, value) -> log.info("received key: {}, value: {}", key, value))
.filter((key, value) -> /* filter events with value is ____ */)
.groupByKey()
.aggregate(new Initializer<Count>() {
@Override
public Count apply() {
return new Count("", 0);
}
}, new Aggregator<String, String, Count>() {
@Override
public Count apply(String k, String v, Count aggKeyCount) {
Integer currentCount = aggKeyCount.getCount();
return new Count(k, currentCount + 1);
}
});
aggregate.toStream()
.map((k,v) -> new KeyValue<>(k, v.getCount()))
.peek((key, value) -> log.info("emitting key: {}, value: {}", key, value))
.to(COUNTS_TOPIC, Produced.with(Serdes.String(), Serdes.Integer()));
The operations groupByKey
, aggregate
, filter
, map
are transforming the events in some way (the peek
does not transform the events). The transformations groupByKey
, filter
and map
are stateless
because they potentially modify the current events that they are processing (they do not care about the previous events). The aggregate
transformation is counting the number of events, so it is summing them up. Therefore it depends on the previous events. Not that it has an Initializer
of Count("", 0)
and it aggregates events one by one on Count(k, currentCount + 1)
that is based on the previous counter aggKeyCount
.
Moreover, the concept of stateless
and stateful
is not on KStream
of Kafka. It is on all Processing engines such as Hadoop MapReduce, Apache Spark, Apache Flink, Apache Storm. And it is also present on any processing pipeline, such as Java Stream (e.g.: map
, reduce
, flatmap
, filter
), Akka Streams, project reactor.

- 7,013
- 8
- 44
- 102
-
1thank you for the example, it helped in understanding the concept better – user3122771 May 17 '21 at 10:25