Using kafka streams processor api
Scenario : streams processor( implemented using kafka streams processor api) reads data from source topic and writes data to target topic based on some buisness logic.
Code :
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsProcessor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dev_cluster.org:9092");
props.put(StreamsConfig.STATE_DIR_CONFIG, "streams-pipe");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
Topology topology = new Topology();
topology.addSource("mySource", "source_topic");
topology.addProcessor("StreamsProcessor",()->new StreamsProcessor(), "mySource");
topology.addSink("sink1","output_topic","StreamsProcessor");
topology.addSink("sink2","output_topic2","StreamsProcessor");
topology.addSink("sink3","output_topic3","StreamsProcessor");
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
--------------------------------------------------------------
public void init(ProcessorContext context)
{
this.context = context;
context.commit();
}
public void process(String key, String Value)
{
// In a loop send to sink1 sink2 or sink3
context.forward(key,Value,To.child("sink1"));
}
----------------------------------------------------------------
Question:
If stream processor fails to publish messages to one or more target topics above then what are some of the best ways for retry mechanism using kafka streams processor api ?
Please share code snippets/links/best practices to handle failure scenarios . Thanks.