0

I'm trying to create multiple implementation classes for a kafka event based on the type of event.

public class KafkaListener {
    @Autowired
    Service service;

    @KafkaListener(topics = ("mytopic"), containerFactory = "kafkaListenerContainerFactory")
    public void consumeSource(Object event) {
        service.process(event);
    }
}

public interface Service<E> {
    void process(E event);
}

public class ServiceImpl1 implements Service<Event1> {
    void process(Event1 event1) {
         // process 
    }
}

public class ServiceImpl2 implements Service<Event2> {
    void process(Event2 event2) {
         // process 
    }
}

//Event1 & Event2 are 2 POJO classes with different inputs

Is it possible to implement or am I supposed to create multiple listener, one for each event type?

Spartan
  • 339
  • 1
  • 3
  • 14

1 Answers1

1

As long as the events are deserialized by Kafka, you can use a class level @KafkaListener with method level @KafkaHandlers.

See the documentation.

When you use @KafkaListener at the class-level, you must specify @KafkaHandler at the method level. When messages are delivered, the converted message payload type is used to determine which method to call. The following example shows how to do so:

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String foo) {
        ...
    }

    @KafkaHandler
    public void listen(Integer bar) {
        ...
    }

    @KafkaHandler(isDefault = true`)
    public void listenDefault(Object object) {
        ...
    }

}

Starting with version 2.1.3, you can designate a @KafkaHandler method as the default method that is invoked if there is no match on other methods. At most, one method can be so designated. When using @KafkaHandler methods, the payload must have already been converted to the domain object (so the match can be performed). Use a custom deserializer, the JsonDeserializer, or the JsonMessageConverter with its TypePrecedence set to TYPE_ID. See Serialization, Deserialization, and Message Conversion for more information.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Hello @Gary, is it possible to somehow configure on Kafka Listener configuration level that events with not supported type will be skipped? We have hundreds of KafkaListeners per microservices and it's very confusing when someone forgets to add "default method" – Aventes Jan 28 '21 at 12:54
  • 1
    Such questions are best asked as a new question, rather than in a comment on an old anser, so that a complete answer with code can be provided. I have asked and answered it [here](https://stackoverflow.com/questions/65943168/how-to-handle-a-kafka-record-with-a-class-level-kafkalistener-with-no-kafkahan). – Gary Russell Jan 28 '21 at 18:49