0

I just want to understand that what is the scope of @kafkaListener, either prototype or singleton. In case of multiple consumers of a single topic, is it return the single instance or multiple instances. In my case, I have multiple customers are subscribed to single topic and get the reports. I just wanted to know, what would happen, if

  • multiple customers wants to query for the report on the same time. In my case, I am closing the container after successful consumption of messages but at the same time if some other person wants to fetch reports, the container should be open.

  • how to change the scope to prototype (if it is not) associated with Id's of container, so that each time a separate instance can be generated.

    @KafkaListener(id = "id1", topics = "testTopic" )
     public void listen() {
        // code goes here
    }
    
Sumit Sood
  • 441
  • 7
  • 23

2 Answers2

0

A Single Listener Instance is invoked for all consuming Threads.

The annotation @KafkaListener is not Prototype scoped, and it is not possible with this annotation either.

4.1.10. Thread Safety

When using a concurrent message listener container, a single listener instance is invoked on all consumer threads. Listeners, therefore, need to be thread-safe, and it is preferable to use stateless listeners. If it is not possible to make your listener thread-safe or adding synchronization would significantly reduce the benefit of adding concurrency, you can use one of a few techniques:

    Use n containers with concurrency=1 with a prototype scoped MessageListener bean so that each container gets its own instance (this is not possible when using @KafkaListener).

    Keep the state in ThreadLocal<?> instances.

    Have the singleton listener delegate to a bean that is declared in SimpleThreadScope (or a similar scope).

To facilitate cleaning up thread state (for the second and third items in the preceding list), starting with version 2.2, the listener container publishes a ConsumerStoppedEvent when each thread exits. You can consume these events with an ApplicationListener or @EventListener method to remove ThreadLocal<?> instances or remove() thread-scoped beans from the scope. Note that SimpleThreadScope does not destroy beans that have a destruction interface (such as DisposableBean), so you should destroy() the instance yourself.
    By default, the application context’s event multicaster invokes event listeners on the calling thread. If you change the multicaster to use an async executor, thread cleanup is not effective. 

enter image description here

https://docs.spring.io/spring-kafka/reference/html/

=== Edited ===

Lets take their 3rd option (Delcaring a SimpleThreadScope and delegating to it)

Register SimpleThreadScope . It is not picked up automatically. You need to register it like below:

@Bean
public static BeanFactoryPostProcessor beanFactoryPostProcessor() {
    return new BeanFactoryPostProcessor() {
        @Override
        public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {

            beanFactory.registerScope("thread", new SimpleThreadScope());
        }
    };
}

Create a component with scopeName = "thread"

    @Component
    @Scope(scopeName = "thread", proxyMode = ScopedProxyMode.TARGET_CLASS)
    public class KafkaDelegate{


     public void handleMessageFromKafkaListener(String message){
  
             //Do some stuff here with Message
    }
}

Create a @Service

public class KafkaListenerService{


    @Autowired
    private KafkaDelegate kafkaDelegate;

    
    @KafkaListener(id = "id1", topics = "testTopic" )
    public void listen(String message) {
        kafkaDelete.handleMessageFromKafkaListener(message);
    }

}

Another example: How to implement a stateful message listener using Spring Kafka?

JCompetence
  • 6,997
  • 3
  • 19
  • 26
  • how can we achieve that? any example? – Sumit Sood Sep 13 '21 at 11:39
  • You can use a prototype scope bean with `@KafkaListener` - see [this answer](https://stackoverflow.com/questions/68744775/can-i-add-topics-to-my-kafkalistener-at-runtime/68745230#68745230). – Gary Russell Sep 13 '21 at 14:18
0

See this answer for an example of how to use a prototype scoped @KafkaListener bean.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179