1

I am using Spring cloud stream kafka as a binder. When my message is too large, I get the error

  ERROR o.s.k.s.LoggingProducerListener - Exception thrown when sending a message with key='null' and payload='{123, 34, 105, 100, 34, 58, 34, 115, 105, 110, 103, 97, 112, 111, 114, 101, 104, 101, 114, 97, 108, ...' to topic page:
    org.apache.kafka.common.errors.RecordTooLargeException: The message is 4711755 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

This is my springboot code below for sending the message

 private BinderAwareChannelResolver resolver;

    boolean isSent = this.resolver.resolveDestination(this.topic)
                    .send(message);

Since i am getting the error, I should be able to catch RecordTooLargeException in my springboot code. However, it is not being caught and the code continues. isSent is also returned as "true". Shouldn't it return as false? How can i catch this error and handle it? thanks

Ninja Dude
  • 1,332
  • 4
  • 27
  • 54

1 Answers1

0

If the error is internal and is raised by any thread which is not under the direct control of your application code, you may want to use an UncaughtExceptionHandler

Thread.setDefaultUncaughtExceptionHandler((whichThread, whatException) -> {
            if (whatException.getClass()
                    .equals(org.apache.kafka.common.errors.RecordTooLargeException.class) ||
                    whatException.getCause().getClass()
                            .equals(org.apache.kafka.common.errors.RecordTooLargeException.class)) {
                // do something
            }
            else if (or) else {
            // others
            }
        });

But the above code is only to get you know about the exception. If you receive records larger than the size, you should be changing it on the broker and the topic.

max.message.bytes (topic)
message.max.bytes (broker)

P.S: Instead of getClass().equals() you may want to use instanceof or isAssignableFrom() also.

Update:

I encountered the same error in KafkaProducer and the exception is returned in the Callback

producer.send(new ProducerRecord<>(topic, key, value), (recordMetadata, exception) -> {
    if (exception instanceof RecordTooLargeException) {
          // handling code
    }
});

You can also see implementing Spring kafka asynchronous callback

JavaTechnical
  • 8,846
  • 8
  • 61
  • 97