1

I have two Kafka clusters, the IPs for which I am fetching dynamically from database. I am using @KafkaListener for creating listeners. Now I want to create multiple Kafka listeners at runtime depending on the bootstrap server attribute(comma-separated values), each one listening to a cluster. Can you please suggest me how do I achieve this?

Spring-boot: 2.1.3.RELEASE Kafka-2.0.1 Java-8

Iqbal
  • 73
  • 3
  • 14

2 Answers2

4

Your requirements are not clear but, assuming you want the same listener configuration to listen to multiple clusters, here is one solution. i.e. make the listener bean a prototype and mutate the container factory for each instance...

@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application {

    public static void main(String[] args) {
        SpringApplication.run(So55311070Application.class, args);
    }

    private final Map<String, MyListener> listeners = new HashMap<>();

    @Bean
    public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
            ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
            ApplicationContext context, KafkaListenerEndpointRegistry registry) {

        return args -> {
            AtomicInteger instance = new AtomicInteger();
            Arrays.stream(props.getClusters()).forEach(cluster -> {
                Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
                consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
                String groupId = "group" + instance.getAndIncrement();
                consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
                containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
                this.listeners.put(groupId, context.getBean("listener", MyListener.class));
            });
            registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
        };
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public MyListener listener() {
        return new MyListener();
    }

}

class MyListener {

    @KafkaListener(topics = "so55311070")
    public void listen(String in) {
        System.out.println(in);
    }

}

@ConfigurationProperties(prefix = "kafka")
public class ClusterProperties {

    private String[] clusters;

    public String[] getClusters() {
        return this.clusters;
    }

    public void setClusters(String[] clusters) {
        this.clusters = clusters;
    }

}
kafka.clusters=localhost:9092,localhost:9093

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false

Result

group0
group1
...
2019-03-23 11:43:25.993  INFO 74869 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    
    : partitions assigned: [so55311070-0]
2019-03-23 11:43:25.994  INFO 74869 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    
    : partitions assigned: [so55311070-0]

EDIT

Add code to retry starting failed containers.

It turns out we don't need a local map of listeners, the registry has a map of all containers, including the ones that failed to start.

@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application {

    public static void main(String[] args) {
        SpringApplication.run(So55311070Application.class, args);
    }

    private boolean atLeastOneFailure;

    private ScheduledFuture<?> restartTask;

    @Bean
    public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
            ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
            ApplicationContext context, KafkaListenerEndpointRegistry registry, TaskScheduler scheduler) {

        return args -> {
            AtomicInteger instance = new AtomicInteger();
            Arrays.stream(props.getClusters()).forEach(cluster -> {
                Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
                consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
                String groupId = "group" + instance.getAndIncrement();
                consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
                attemptStart(containerFactory, context, consumerProps, groupId);
            });
            registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
            if (this.atLeastOneFailure) {
                Runnable rescheduleTask = () -> {
                    registry.getListenerContainers().forEach(c -> {
                        this.atLeastOneFailure = false;
                        if (!c.isRunning()) {
                            System.out.println("Attempting restart of " + c.getGroupId());
                            try {
                                c.start();
                            }
                            catch (Exception e) {
                                System.out.println("Failed to start " + e.getMessage());
                                this.atLeastOneFailure = true;
                            }
                        }
                    });
                    if (!this.atLeastOneFailure) {
                        this.restartTask.cancel(false);
                    }
                };
                this.restartTask = scheduler.scheduleAtFixedRate(rescheduleTask,
                        Instant.now().plusSeconds(60),
                        Duration.ofSeconds(60));
            }
        };
    }

    private void attemptStart(ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
            ApplicationContext context, Map<String, Object> consumerProps, String groupId) {

        containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
        try {
            context.getBean("listener", MyListener.class);
        }
        catch (BeanCreationException e) {
            this.atLeastOneFailure = true;
        }
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public MyListener listener() {
        return new MyListener();
    }

    @Bean
    public TaskScheduler scheduler() {
        return new ThreadPoolTaskScheduler();
    }

}

class MyListener {

    @KafkaListener(topics = "so55311070")
    public void listen(String in) {
        System.out.println(in);
    }

}

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Thanks a lot Gary for the guidance. I am now able to spawn multiple consumers depending on the number of clusters. However, while starting the application if one of the mentioned clusters is unavailable, the consumer keeps retrying for the connection and then eventually times out bringing down the entire application. While I am looking for an approach even if one of the mentioned cluster is unavailable my other consumer must keep running(which is successfully connected to the available cluster). – Iqbal Mar 24 '19 at 06:47
  • I am getting the exception-java.lang.IllegalStateException: Failed to execute ApplicationRunner Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'listener' defined. Perhaps because since one of my cluster is unavailable listener bean is not getting initialized properly and hence is failing. – Iqbal Mar 24 '19 at 06:51
  • Simply put `try{...} catch(...) {...}` around the call to `getBean()`. You can speed up the failure by setting the `default.api.timeout.ms` property; e.g. `spring.kafka.consumer.properties.default.api.timeout.ms=5000`. – Gary Russell Mar 24 '19 at 13:27
  • Thanks for the suggestion, it worked. Is there a way I can have the consumer polling for the connection, so that once the cluster becomes available the consumer must get connected and start consuming messages. – Iqbal Mar 25 '19 at 08:01
0
  1. Create a template class for future listeners:

     public class KafkaTemplateListener implements MessageListener<String, String> {
         @Override
         public void onMessage(ConsumerRecord<String, String> record) {
             System.out.println("RECORD PROCESSING: " + record);
         }
     }
    
  2. Create KafkaListenerEndpoint with the implemented template:

     @Service
     public class KafkaListenerCreator {
         String kafkaGroupId = "kafkaGroupId";
         String kafkaListenerId = "kafkaListenerId-";
         static AtomicLong endpointIdIndex = new AtomicLong(1);
    
         private KafkaListenerEndpoint createKafkaListenerEndpoint(String topic) {
             MethodKafkaListenerEndpoint<String, String> kafkaListenerEndpoint =
                 createDefaultMethodKafkaListenerEndpoint(topic);
             kafkaListenerEndpoint.setBean(new KafkaTemplateListener());
             try {
                 kafkaListenerEndpoint.setMethod(KafkaTemplateListener.class
                     .getMethod("onMessage", ConsumerRecord.class));
             } catch (NoSuchMethodException e) {
                 throw new RuntimeException("Attempt to call a non-existent method " + e);
             }
             return kafkaListenerEndpoint;
         }
    
         private MethodKafkaListenerEndpoint<String, String> createDefaultMethodKafkaListenerEndpoint(String topic) {
             MethodKafkaListenerEndpoint<String, String> kafkaListenerEndpoint = new MethodKafkaListenerEndpoint<>();
             kafkaListenerEndpoint.setId(generateListenerId());
             kafkaListenerEndpoint.setGroupId(kafkaGroupId);
             kafkaListenerEndpoint.setAutoStartup(true);
             kafkaListenerEndpoint.setTopics(topic);
             kafkaListenerEndpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
             return kafkaListenerEndpoint;
         }
    
         private String generateListenerId() {
             return kafkaGeneralListenerEndpointId + endpointIdIndex.getAndIncrement();
    
         }
     }
    
  3. Register KafkaListenerEndpointRegistry with the endpoint:

     @Service
     public class KafkaListenerCreator {
         //... HERE HAS TO BE VARIABLES FROM PREVIOUS EXAMPLE
    
         @Autowired
         private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
         @Autowired
         private KafkaListenerContainerFactory kafkaListenerContainerFactory;
    
         public void createAndRegisterListener(String topic) {
             KafkaListenerEndpoint listener = createKafkaListenerEndpoint(topic);
             kafkaListenerEndpointRegistry.registerListenerContainer(listener, kafkaListenerContainerFactory, true);
         }
    
         //... HERE HAS TO BE METHODS FROM PREVIOUS EXAMPLE
    
     }
    
Valery Putnin
  • 61
  • 1
  • 4