3

I currently have some code that builds a KTable using aggregate:

inputTopic.groupByKey().aggregate(
    Aggregator::new,
    (key, value, aggregate) -> {
        someProcessingDoneHere;
        return aggregate;
    },
    Materialized.with(Serdes.String(), Serdes.String())
);
        

Once a given number of messages have been received and aggregated for a single key, I would like to push the latest aggregation state to another topic and then delete the key in the table.

I can obviously use a plain Kafka producer and have something like:

inputTopic.groupByKey().aggregate(
    Aggregator::new,
    (key, value, aggregate) -> {
        someProcessingDoneHere;
        if (count > threshold) {
                producer.send(new ProducerRecord<String,String>("output-topic", 
                    key, aggregate));
                return null;
        }
        return aggregate;
    },
    Materialized.with(Serdes.String(), Serdes.String())
);
        

But I'm looking for a more "Stream" approach.

Any hint ?

user3923073
  • 151
  • 2
  • 12

1 Answers1

0

I think the best solution here is to just throw the aggregation back to a stream, then filter the values you want before sending it to a topic.

inputTopic.groupByKey().aggregate(
    Aggregator::new,
    (key, value, aggregate) -> {
        someProcessingDoneHere;
        return aggregate;
    },
    Materialized.with(Serdes.String(), Serdes.String())
)
.toStream()
.filter((key, value) -> (value.count > threshold)
.to("output-topic");

Edit: I just realized you want to do this before it is serialized. I think the only way to do this is to use transformer or processor instead of aggregate.

There you get access to a StateStore instead of KTable. And it also gives you access to context.forward() that lets you forward a message downstream any way you want.

Some pseudo-code to show how it could be done using transform

@Override
public Transformer<String, String, KeyValue<String, String>> get() {
    return new Transformer<String, String, KeyValue<String, String>>() {

        private KeyValueStore<String, String> stateStore;
        private ProcessorContext context;

        @SuppressWarnings("unchecked")
        @Override
        public void init(final ProcessorContext context) {
            this.context = context;
            stateStore = (KeyValueStore<String, String>) context.getStateStore(STATE_STORE_NAME);
        }

        @Override
        public KeyValue<String, String> transform(String key, String value) {
            String prevAggregation = stateStore.get(key);
            //use prevAggregation and value to calculate newAggregation here:
            //...
            if (newAggregation.length() > threshold) {
                context.forward(key, newAggregation);
                stateStore.delete(key);
            } else {
                stateStore.put(key, newAggregation);
            }
            return null; // transform ignore null
        }

        @Override
        public void close() {
            // Note: The store should NOT be closed manually here via `stateStore.close()`!
            // The Kafka Streams API will automatically close stores when necessary.
        }
    };
}
Emil
  • 97
  • 4
  • thanks for your answer, however the problem with transformer / processor approach is that we cannot leverage the groupBy preceding aggregate, and I am not sure how to group by inside a processor, and if this design is finally better than using a simple producer ? (good tip about the `context.forwward()` anyway) – Donatello Feb 23 '22 at 17:23
  • The second answer could be more detailed, but groupByKey is not necessary. process gives you access to a [keyValueStore](https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/state/KeyValueStore.html). As long as you `get()` and `put()` your aggregation using the records key you should be fine. – Emil Feb 23 '22 at 17:45