0

We are using Ktabke for aggregation in kafka, its very basic use and have referred from kafka doc.

I am just trying to investigate that if some message consumption fails while aggregating how we can move such message to error topic or dlq.

I found something similar for KStream but not able to find for KTable and i was not able to simply extends KStream solution to ktable.

Reference for KStream Handling bad messages using Kafka's Streams API

My use case is very simple for any kind of exception just move to error topic and move on to different message

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Abhishek
  • 519
  • 1
  • 6
  • 24
  • Have you seen https://docs.confluent.io/current/streams/faq.html#option-3-quarantine-corrupted-records-dead-letter-queue ? – miguno Apr 08 '19 at 07:04
  • Yeah i have read this, but this looks like only handles ser/deser error, what i am looking for is any kind of exception whether application related or kafka related. – Abhishek Apr 08 '19 at 07:40
  • 1
    Perhaps you can clarify the failure scenarios you are trying to cover when you say "I am just trying to investigate that if some message consumption fails". – miguno Apr 08 '19 at 08:11
  • It could be any unforeseen issue like NPE in the application logic while aggregating, or lets say i am publishing the aggregated result to another kafka topic and that broker is down or it could be anything. If there a catch all kind of exception handling. Basically i want to achieve same result as we have in mq where if there is any kind of error it just put the message to error queue and move on to the next message. Basically i dont want my application to go fown if there is any corrupt message, i wany my application to move that corrupt message to error topic and move on to next message – Abhishek Apr 08 '19 at 08:15
  • "lets say i am publishing the aggregated result to another kafka topic and that broker is down or it could be anything." This type of errors is automatically handled by Kafka Streams. – miguno Apr 08 '19 at 09:55
  • That was just an example, is there a catch all kind of handler just like we have in mq? How can we handle any unforeseen error or application NPE etc – Abhishek Apr 08 '19 at 09:59
  • @MichaelG.Noll i also noticed that you have mentioned the solution for KStream (link mentioned in the question ) does similar solution exist for KTable as well? – Abhishek Apr 09 '19 at 09:02
  • Kafka Streams supports an `UncaughtExceptionHandler`. See e.g. https://docs.confluent.io/current/streams/developer-guide/write-streams.html#using-kafka-streams-within-your-application-code. – miguno Apr 09 '19 at 10:35
  • I went through this link as well, but this is not what we want, we dont want the app to terminate, we want to the app to continue processing subsequent messages by moving the bad message to error topic – Abhishek Apr 09 '19 at 10:48

1 Answers1

0

There is no built in support for what you ask atm (Kafka 2.2), but you need to make sure that your application code does not throw any exceptions. All provided handlers that can be configured, are for exceptions thrown by Kafka Streams runtime. Those handlers are providing, because otherwise the user has no chance at all to react to those exception.

Feel free to create feature request Jira.

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137