0

I am using kafka consumer. i did an upgrade from Spring boot 1.5 to 2.6. now when i run the applicaiton, its failing to start throwing NullPointerException . please let me know if any one could help me on this issue.

Caused by: java.lang.NullPointerException
    at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1011) ~[?:1.8.0_25]
    at java.util.concurrent.ConcurrentHashMap.putAll(ConcurrentHashMap.java:1084) ~[?:1.8.0_25]
    at java.util.concurrent.ConcurrentHashMap.<init>(ConcurrentHashMap.java:852) ~[?:1.8.0_25]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.<init>(DefaultKafkaConsumerFactory.java:129) ~[spring-kafka-2.8.4.jar:2.8.4]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.<init>(DefaultKafkaConsumerFactory.java:98) ~[spring-kafka-2.8.4.jar:2.8.4]

Failed to instantiate [org.springframework.kafka.core.ConsumerFactory

pom.xml

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.6</version>
    </parent>
<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-tx</artifactId>
            <version>5.2.12.RELEASE</version>
        </dependency> 

ReceiverConfigClass:

@Slf4j
@Configuration
@EnableKafka
@ExcludeKafkaReceiver
public class XXTriggerKakfaReceiverConfig {
   
    @Autowired
    xxTriggerKafkaManagedBean kafkaListenerConfig;

    @Bean
    public Map<String, Object> xxConsumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        kafkaListenerConfig.getKafkaServerConfig());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaListenerConfig.getKafkaServerConfig());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MessageSerializerType.getValue(kafkaListenerConfig.getValueDeserializerClass()));
        props.put(Constants.AVRO_MSG_CLASS_TYPE, kafkaListenerConfig.getAvroMessageClassType());
        // consumer groups allow a pool of processes to divide the work of
        // consuming and processing records
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, kafkaListenerConfig.getPartitionAssignmentStrategy());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaListenerConfig.getGroupId());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaListenerConfig.getAutoOffsetReset());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaListenerConfig.getEnableAutoCommit());
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaListenerConfig.getSessionTimeoutMS());
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaListenerConfig.getHeartbeatIntervalMS());
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaListenerConfig.getRequestTimeoutMS());
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, kafkaListenerConfig.getFetchMaxWaitMS());
        props.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, kafkaListenerConfig.getReconnectBackoffMS());
        props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, kafkaListenerConfig.getRetryBackoffMS());
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, kafkaListenerConfig.getMaxPartitionFetchSize());
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, kafkaListenerConfig.getFetchMinBytes());
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaListenerConfig.getMaxPollRecords());

        return props;
    }

    @Bean
    public ConsumerFactory<String, Object> xxConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(xxConsumerConfigs());  //This is the place throwing Null pointer Exception
    }

    /**
     * A concurrent Kafka listener with concurrency managed through {concurrent_kafka_listeners} property value.
     *
     * @return ConcurrentKafkaListenerContainerFactory instance 
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> xxKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(xxConsumerFactory());
        factory.setConcurrency(kafkaListenerConfig.getNumberOfConcurrentListeners());
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        ErrorHandler errorHandler = new KafkaListenerErrorHandler();
        errorHandler.setAckAfterHandle(false);        
        factory.setCommonErrorHandler((CommonErrorHandler) errorHandler);
        return factory;
    }

    /**
     * A concurrent Kafka batch listener with concurrency managed through {concurrent_kafka_listeners} property value.
     *
     * @return ConcurrentKafkaListenerContainerFactory instance
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> xxKafkaBatchListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(xxConsumerFactory());
        factory.setConcurrency(kafkaListenerConfig.getNumberOfConcurrentListeners());
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        ErrorHandler errorHandler = new KafkaListenerErrorHandler(); 
        errorHandler.setAckAfterHandle(false);        
        factory.setCommonErrorHandler((CommonErrorHandler) errorHandler);
        factory.setBatchListener(true);
        return factory;
    }
M. Deinum
  • 115,695
  • 22
  • 220
  • 224
Madhu
  • 75
  • 1
  • 2
  • 5
  • That exception is thrown on the `this.configs = new ConcurrentHashMap<>(configs);`. So, looks like something in your `xxConsumerConfigs` is really `null`. – Artem Bilan May 11 '22 at 15:12
  • Spring Boot 2.6 uses Spring 5.3, you have Spring 5.2 dependencies in there. – M. Deinum May 11 '22 at 15:40

1 Answers1

0

I was facing the same error where before the Springboot upgrade (v2.2.4 -> v2.7.3), the code was working fine and post-upgrade, it started throwing a NullPointerException. I would strongly recommend debugging the code and go inside Spring libraries to understand which exact property is failing. In my case, it was

    SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG=
or
    ssl.endpoint.identification.algorithm=

which had been working fine as a null before the upgrade. But after the upgrade, it had to be set to an empty String due to changes in Kafka library.

Found help from this answer: https://stackoverflow.com/a/56648768/4898612.

Good luck!