I am working on Kafka Streams using Spring Cloud Stream. In the message processing application, there may be a chance that it will produce an error. So the message should not be commited and retried again.
My application method -
@Bean
public Function<KStream<Object, String>, KStream<String, Long>> process() {
return (input) -> {
KStream<Object, String> kt = input.flatMapValues(v -> Arrays.asList(v.toUpperCase().split("\\W+")));
KGroupedStream<String, String> kgt =kt.map((k, v) -> new KeyValue<>(v, v)).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
KTable<Windowed<String>, Long> ktable = kgt.windowedBy(TimeWindows.of(500)).count();
KStream<String, WordCount> kst =ktable.toStream().map((k,v) -> {
WordCount wc = new WordCount();
wc.setWord(k.key());
wc.setCount(v);
wc.setStart(new Date(k.window().start()));
wc.setEnd(new Date(k.window().end()));
dao.insert(wc);
return new KeyValue<>(k.key(),wc);
});
return kst.map((k,v) -> new KeyValue<>(k, v.getCount()));
};
}
Here if DAO insert method fails, the message should not be published to output topic and the processing of the same message should be retried.
How can we configure kafka streams binder to do this ?. Any help regarding this is much appreciated.