Kafka Streams provides 2 semantics : emit-on-update and emit-on-window-close.
KIP-557 is about adding emit-on-change semantic based on byte array comparison of data. It has been implemented in Kafka Streams 2.6 and then removed due to "potential data loss".
Nevertheless, I have developed an implementation of the emit-on-change semantic, by using the Kafka Streams DSL.
The idea is to convert a KStream with emit-on-update semantic to a KStream with emit-on-change semantic. You can use this implementation on the source Kstream that you provide to create the KTable, or on the KTable after applying .toStream()
.
This implementation implicitly creates a state store, where the value contains the KStream data and a flag, that indicates if an update should be emitted. This flag is set in the aggregate operation and is based on Object#equals
for comparison. But you could change the implementation to use a Comparator
.
Here is the withEmitOnChange
method that change the semantic of a KStream. You might have to specify a serde for EmitOnChangeState
data structure (see below).
public static <K, V> KStream<K, V> withEmitOnChange(KStream<K, V> streams) {
return streams
.groupByKey()
.aggregate(
() -> (EmitOnChangeState<V>) null,
(k, data, state) -> {
if (state == null) {
return new EmitOnChangeState<>(data, true);
} else {
return state.merge(data);
}
}
)
.toStream()
.filter((k, state) -> state.shouldEmit)
.mapValues(state -> (V) state.data);
}
Here is the data structure that is stored in state store and used to check if an update should be emitted.
public static class EmitOnChangeState<T> {
public final T data;
public final boolean shouldEmit;
public EmitOnChangeState(T data, boolean shouldEmit) {
this.data = data;
this.shouldEmit = shouldEmit;
}
public EmitOnChangeState<T> merge(T newData) {
return new EmitOnChangeState<>(newData, Objects.equals(data, newData));
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
EmitOnChangeState<?> that = (EmitOnChangeState<?>) o;
return shouldEmit == that.shouldEmit && Objects.equals(data, that.data);
}
@Override
public int hashCode() {
return Objects.hash(data, shouldEmit);
}
}
Usage:
KStream<ProductKey, Product> products = builder.stream("product-topic");
withEmitOnChange(products)
.to("out-product-topic"); // output topic with emit-on-change semantic