I am using kafka consumer. i did an upgrade from Spring boot 1.5 to 2.6. now when i run the applicaiton, its failing to start throwing NullPointerException . please let me know if any one could help me on this issue.
Caused by: java.lang.NullPointerException
at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1011) ~[?:1.8.0_25]
at java.util.concurrent.ConcurrentHashMap.putAll(ConcurrentHashMap.java:1084) ~[?:1.8.0_25]
at java.util.concurrent.ConcurrentHashMap.<init>(ConcurrentHashMap.java:852) ~[?:1.8.0_25]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.<init>(DefaultKafkaConsumerFactory.java:129) ~[spring-kafka-2.8.4.jar:2.8.4]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.<init>(DefaultKafkaConsumerFactory.java:98) ~[spring-kafka-2.8.4.jar:2.8.4]
Failed to instantiate [org.springframework.kafka.core.ConsumerFactory
pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.6</version>
</parent>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>5.2.12.RELEASE</version>
</dependency>
ReceiverConfigClass:
@Slf4j
@Configuration
@EnableKafka
@ExcludeKafkaReceiver
public class XXTriggerKakfaReceiverConfig {
@Autowired
xxTriggerKafkaManagedBean kafkaListenerConfig;
@Bean
public Map<String, Object> xxConsumerConfigs() {
Map<String, Object> props = new HashMap<>();
kafkaListenerConfig.getKafkaServerConfig());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaListenerConfig.getKafkaServerConfig());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MessageSerializerType.getValue(kafkaListenerConfig.getValueDeserializerClass()));
props.put(Constants.AVRO_MSG_CLASS_TYPE, kafkaListenerConfig.getAvroMessageClassType());
// consumer groups allow a pool of processes to divide the work of
// consuming and processing records
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, kafkaListenerConfig.getPartitionAssignmentStrategy());
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaListenerConfig.getGroupId());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaListenerConfig.getAutoOffsetReset());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaListenerConfig.getEnableAutoCommit());
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaListenerConfig.getSessionTimeoutMS());
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaListenerConfig.getHeartbeatIntervalMS());
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaListenerConfig.getRequestTimeoutMS());
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, kafkaListenerConfig.getFetchMaxWaitMS());
props.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, kafkaListenerConfig.getReconnectBackoffMS());
props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, kafkaListenerConfig.getRetryBackoffMS());
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, kafkaListenerConfig.getMaxPartitionFetchSize());
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, kafkaListenerConfig.getFetchMinBytes());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaListenerConfig.getMaxPollRecords());
return props;
}
@Bean
public ConsumerFactory<String, Object> xxConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(xxConsumerConfigs()); //This is the place throwing Null pointer Exception
}
/**
* A concurrent Kafka listener with concurrency managed through {concurrent_kafka_listeners} property value.
*
* @return ConcurrentKafkaListenerContainerFactory instance
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> xxKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(xxConsumerFactory());
factory.setConcurrency(kafkaListenerConfig.getNumberOfConcurrentListeners());
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
ErrorHandler errorHandler = new KafkaListenerErrorHandler();
errorHandler.setAckAfterHandle(false);
factory.setCommonErrorHandler((CommonErrorHandler) errorHandler);
return factory;
}
/**
* A concurrent Kafka batch listener with concurrency managed through {concurrent_kafka_listeners} property value.
*
* @return ConcurrentKafkaListenerContainerFactory instance
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> xxKafkaBatchListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(xxConsumerFactory());
factory.setConcurrency(kafkaListenerConfig.getNumberOfConcurrentListeners());
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
ErrorHandler errorHandler = new KafkaListenerErrorHandler();
errorHandler.setAckAfterHandle(false);
factory.setCommonErrorHandler((CommonErrorHandler) errorHandler);
factory.setBatchListener(true);
return factory;
}