3

What is need

I'm writing an application (Spring + Kotlin) that takes information with Kafka. If I set autoStartup = "true" when declaring a @KafkaListener then the app works fine but only if broker is available. When the broker is unavailable application crashes on start. It's undesirable behavior. The application must work and perform other functions.

What I tried to do

For the escape of crashing application on start somebody on this site in another topic advised setting autoStartup = "false" when declaring a @KafkaListener. And it really helped to prevent crash on start. But now I cannot successfully start KafkaListener manually. In other examples I saw auto wiring of KafkaListenerEndpointRegistry, but when I trying to do it:

@Service
class KafkaConsumer @Autowired constructor(
        private val kafkaListenerEndpointRegistry: KafkaListenerEndpointRegistry
) {

IntelliJ Idea warns:

Could not autowire. No beans of 'KafkaListenerEndpointRegistry' type found.

When I try to use KafkaListenerEndpointRegistry without autowiring and perform this code:

@Service
class KafkaConsumer {
    private val logger = LoggerFactory.getLogger(this::class.java)
    private val kafkaListenerEndpointRegistry = KafkaListenerEndpointRegistry()

    @Scheduled(fixedDelay = 10000)
    fun startCpguListener(){
        val container = kafkaListenerEndpointRegistry.getListenerContainer("consumer1")
        if (!container.isRunning)
            try {
                logger.info("Kafka Consumer is not running. Trying to start...")
                container.start()
            } catch (e: Exception){
                logger.error(e.message)
            }
    }

    @KafkaListener(
            id = "consumer1",
            topics = ["cpgdb.public.user"],
            autoStartup = "false"
    )
    private fun listen(it: ConsumerRecord<JsonNode, JsonNode>, qwe: Consumer<Any, Any>){
        val pay = it.value().get("payload")
        val after = pay.get("after")
        val id = after["id"].asInt()
        
        val receivedUser = CpguUser(
                id = id,
                name = after["name"].asText()
        ) 
        logger.info("received user with id = $id")
        }
    }
}

kafkaListenerEndpointRegistry.getListenerContainer("consumer1") always return null. I guess it's because I didn't auto wire kafkaListenerEndpointRegistry. How can I do it? Or if exist another solution of my answer I'll be appreciative any help! Thanks!

There is Kafka config:

@Configuration
@EnableConfigurationProperties(KafkaProperties::class)
class KafkaConfiguration(private val props: KafkaProperties) {

    @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<Any, Any> {
        val factory = ConcurrentKafkaListenerContainerFactory<Any, Any>()
        factory.consumerFactory = consumerFactory()
        factory.setConcurrency(1)
        factory.setMessageConverter(MessagingMessageConverter())
        factory.setStatefulRetry(true)

        val retryTemplate = RetryTemplate()
        retryTemplate.setRetryPolicy(AlwaysRetryPolicy())
        retryTemplate.setBackOffPolicy(ExponentialBackOffPolicy())
        factory.setRetryTemplate(retryTemplate)
        val handler = SeekToCurrentErrorHandler()
        handler.isAckAfterHandle = false
        factory.setErrorHandler(handler)
        factory.containerProperties.isMissingTopicsFatal = false

        return factory
    }

    @Bean
    fun consumerFactory(): ConsumerFactory<Any, Any> {
        return DefaultKafkaConsumerFactory(consumerConfigs())
    }

    @Bean
    fun consumerConfigs(): Map<String, Any> {
        return mapOf(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to props.bootstrap.address,
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
                ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG to listOf(MonitoringConsumerInterceptor::class.java),
                ConsumerConfig.CLIENT_ID_CONFIG to props.receiver.clientId,
                ConsumerConfig.GROUP_ID_CONFIG to props.receiver.groupId,
                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
                ConsumerConfig.ISOLATION_LEVEL_CONFIG to "read_committed",
                ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true
        )
    }
}
  • spring boot version: 2.3.0
  • spring-kafka version: 2.5.3
  • kafka-clients version: 2.5.0
avocadoLambda
  • 1,332
  • 7
  • 16
  • 33
Enbirr
  • 105
  • 2
  • 7

1 Answers1

7

Just ignore IntelliJ's warning about the auto wiring; the bean does exist; it's just that IntelliJ can't detect it.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Thanks! I tried to run the application despite of IDE warnings and it's really works correctly. – Enbirr Jul 07 '20 at 04:21
  • It's sad that a year has passed but the latest version of IDEA Ultimate (2021.2) still has the same problem. – Inego Aug 10 '21 at 05:27
  • This issue still persists on Intellij. Any help to fix the error ? Ive tried following this post, but no success - https://stackoverflow.com/questions/21323309/intellij-idea-shows-errors-when-using-springs-autowired-annotation – V1666 Dec 16 '21 at 23:19