0

Using kafka streams processor api

Scenario : streams processor( implemented using kafka streams processor api) reads data from source topic and writes data to target topic based on some buisness logic.

Code :

  Properties props = new Properties();
  props.put(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsProcessor");
  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dev_cluster.org:9092");
  props.put(StreamsConfig.STATE_DIR_CONFIG, "streams-pipe");
  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);

  Topology topology = new Topology();
  topology.addSource("mySource", "source_topic");
  topology.addProcessor("StreamsProcessor",()->new StreamsProcessor(), "mySource"); 
  topology.addSink("sink1","output_topic","StreamsProcessor");
  topology.addSink("sink2","output_topic2","StreamsProcessor");
  topology.addSink("sink3","output_topic3","StreamsProcessor");

  KafkaStreams streams = new KafkaStreams(topology, props);
  streams.start();
  --------------------------------------------------------------
  public void init(ProcessorContext context) 
  {
      this.context = context;
      context.commit();
  }

  public void process(String key, String Value) 
  {   
      // In a loop send to sink1 sink2 or sink3
      context.forward(key,Value,To.child("sink1"));
  }
  ----------------------------------------------------------------

Question:

If stream processor fails to publish messages to one or more target topics above then what are some of the best ways for retry mechanism using kafka streams processor api ?
Please share code snippets/links/best practices to handle failure scenarios . Thanks.

Maazen
  • 107
  • 1
  • 14
  • 1
    it depends on what type of failure occurred. there are many reasons for failure: deserialization issue, during processing event, during producing message to destination kafka topic etc. if your producer kafka is temporarily unavailable, Kafka Streams provide ability to retry with `retries` property, e.g. `retries: 10`. also please take a look at error handling in Kafka Streams: https://stackoverflow.com/a/51299739/2335775. – Vasyl Sarzhynskyi Jun 27 '20 at 14:00

1 Answers1

0

You can impelement some kind of kafkaProducer that will be a messageFailureHandler and with it you can send all the failed messages to a dedicated kafka topic.

If you are fimiliar with the concept of dead-letter-queue in kafka-connect, it kind of the same (besides in kafka-connect its only a matter of configuration).

Yoni Elisha ッ
  • 303
  • 1
  • 2
  • 11
  • thanks Yoni, but I am looking for a solution pertaining to kafka streams processor api . Also please share some links, snippets, doc etc. – Maazen Jun 26 '20 at 17:47
  • 1
    for example, this is can give you a really good knowledge about how to implement kafka producer https://dzone.com/articles/take-a-deep-dive-into-kafka-producer-api ... If you want a kstreams solution to have a topic that gets all the failed messages, I think it can be a nice solution. – Yoni Elisha ッ Jun 27 '20 at 09:39
  • You can use a discard-topic with a Validator class with pattern matching conditions if the message doesn't meet the condition you can redirect it to discard-topic. – yassinec Mar 20 '23 at 14:11