2

I am trying to set up a service that has both a REST (POST) endpoint and a Kafka endpoint, both of which should take a JSON representation of the request object (let's call it Foo). I would want to make sure that the Foo object is valid (via JSR-303 or whatever). So Foo might look like:

public class Foo {
    @Max(10)
    private int bar;

    // Getter and setter boilerplate
}

Setting up the REST endpoint is easy:

@PostMapping(value = "/", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<String> restEndpoint(@Valid @RequestBody Foo foo) {
    // Do stuff here
}

and if I POST, { "bar": 9 } it processes the request, but if I post: { "bar": 99 } I get a BAD REQUEST. All good so far!

The Kafka endpoint is easy to create (along with adding a StringJsonMessageConverter() to my KafkaListenerContainerFactory so that I get JSON->Object conversion:

@KafkaListener(topics = "fooTopic")
public void kafkaEndpoint(@Valid @Payload Foo foo) {
    // I shouldn't get here with an invalid object!!!
    logger.debug("Successfully processed the object" + foo);

    // But just to make sure, let's see if hand-validating it works
    Validator validator = localValidatorFactoryBean.getValidator();
    Set<ConstraintViolation<SlackMessage>> errors = validator.validate(foo);
    if (errors.size() > 0) {
        logger.debug("But there were validation errors!" + errors);
    }
}

But no matter what I try, I can still pass invalid requests in and they process without error.

I've tried both @Valid and @Validated. I've tried adding a MethodValidationPostProcessor bean. I've tried adding a Validator to the KafkaListenerEndpointRegistrar (a la the EnableKafka javadoc):

@Configuration
public class MiscellaneousConfiguration implements KafkaListenerConfigurer {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    LocalValidatorFactoryBean validatorFactory;

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        logger.debug("Configuring " + registrar);
        registrar.setMessageHandlerMethodFactory(kafkaHandlerMethodFactory());

    }

    @Bean
    public MessageHandlerMethodFactory kafkaHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setValidator(validatorFactory);
        return factory;
    }
}

I've now spent a few days on this, and I'm running out of other ideas. Is this even possible (without writing validation into every one of my kakfa endpoints)?

cmreigrut
  • 185
  • 2
  • 9

3 Answers3

1

Sorry for the delay; we are at SpringOne Platform this week.

The infrastructure currently does not pass a Validator into the payload argument resolver. Please open an issue on GitHub.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
0

Spring kafka listener by default do not scan for @Valid for non Rest controller classes. For more details please refer this answer

https://stackoverflow.com/a/71859991/13898185

0

As per my research, Spring uses Jackson to convert json to array for its internal processing and here Spring Kafka is yet to support it. So what other thing we can do is use javax.validation and define it manually. This is how I did it,

private static final Validator validator;
static {
      validator= Validation.buildDefaultValidatorFactory().getValidator(); 
 }

@KafkaListener(topics = "fooTopic")
     public void kafkaEndpoint(Foo foo) {
        validator.validate(foo); //This will validate all the the json received
cigien
  • 57,834
  • 11
  • 73
  • 112