0

I have a working code for kafkalistener to read messages from the beginning(offset=0) of a topic (always running). For my use case (messaging) I need 2 things:

  1. always catch new messages(this consumer is always running) of specific topic/partition and send to frontend websocket+stomp. (I already have this part)

  2. start new consumer to get messages from beginning to current of specific topic/partition, only when frontend signals and then stop after that so that these data(loading previous messages for the late user or for later) can be fetched by frontend websocket+stomp (at the beginning of its session)

If I can dynamically (after getting signal from frontend) add/remove kafkaListener with parameters(data from post method) it will serve both

actually, how can I implement this? should I think of using post method to notify backend that I need to load previous messages of this topic/partition right now and send those it to this ".." url? but then how can I dynamically start and off that consumer(kafkaListener) without running all the time and pass parameter there?

Nafiul Alam Fuji
  • 407
  • 7
  • 17

1 Answers1

1

Here is a quick Spring Boot application showing how to dynamically create containers.

@SpringBootApplication
public class So61950229Application {

    public static void main(String[] args) {
        SpringApplication.run(So61950229Application.class, args);
    }


    @Bean
    public ApplicationRunner runner(DynamicListener listener, KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send("so61950229", "foo" + i));
            System.out.println("Hit enter to start a listener");
            System.in.read();
            listener.newContainer("so61950229", 0);
            System.out.println("Hit enter to start another listener");
            System.in.read();
            listener.newContainer("so61950229", 0);
        };
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so61950229").partitions(1).replicas(1).build();
    }

}

@Component
class DynamicListener {

    private static final Logger LOG = LoggerFactory.getLogger(DynamicListener.class);

    private final ConcurrentKafkaListenerContainerFactory<String, String> factory;

    private final ConcurrentMap<String, AbstractMessageListenerContainer<String, String>> containers
            = new ConcurrentHashMap<>();

    DynamicListener(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
        this.factory = factory;
    }

    void newContainer(String topic, int partition) {
        ConcurrentMessageListenerContainer<String, String> container =
                this.factory.createContainer(new TopicPartitionOffset(topic, partition));
        String groupId = UUID.randomUUID().toString();
        container.getContainerProperties().setGroupId(groupId);
        container.setupMessageListener((MessageListener) record -> {
            System.out.println(record);
        });
        this.containers.put(groupId, container);
        container.start();
    }

    @EventListener
    public void idle(ListenerContainerIdleEvent event) {
        AbstractMessageListenerContainer<String, String> container = this.containers.remove(
                event.getContainer(ConcurrentMessageListenerContainer.class).getContainerProperties().getGroupId());
        if (container != null) {
            LOG.info("Stopping idle container");
            container.stop(() -> LOG.info("Stopped"));
        }
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.idle-event-interval=5000
Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • srry for late.um using KafkaListener annotation in listener(setting group,topic,partitions,containerfactory,initial offset) and EnableKafka+Configuration in consumer cofiguration where I have ConcurrentKafkaListenerContainerFactory. I hope you already understand my architecture.but all of this is static.I wanna implement above like class architecture what you said here and in docs. I want the @KafkaListener listener as a class so that I can create my listener's with desired configuration at any time stop them. can you kindly give an example with this? I saw in the docs but I lost as a newbie. – Nafiul Alam Fuji Jun 01 '20 at 08:55
  • Sorry; it's not clear what you mean; the code above should do what you need. Perhaps you can explain what it does not do that you need? – Gary Russell Jun 01 '20 at 13:09
  • I am sorry as I failed to explain.. yours is one way but I am following another approach for kafka consumer where the structure will vary. I am using @KafkaListener annotation for kafka listener where I set consumer group, topic,partition etc.. and for this setup the class structure may be different that I saw in the docs – Nafiul Alam Fuji Jun 02 '20 at 17:10
  • I still don't understand the problem. By definition, `@KafkaListener` is static. I don't know how you intend to use such static configuration in a dynamic fashion. If you explain exactly what your use case is I might be able to help. – Gary Russell Jun 02 '20 at 17:14