1

I am trying to configure a Spring Boot application to consume Kafka messages. After adding:

<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>1.1.3.RELEASE</version>
</dependency>

into my dependencies and with @EnableKafka and @KafkaListener(topics = "some-topic") annotations, I am getting the following error:

...
Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named 'kafkaListenerContainerFactory' available

then I add the following configuration:

@Bean
public Map<String, Object> consumerConfigs() {

    Map<String, Object> propsMap = new HashMap<>();

    propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
    propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
    propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return propsMap;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {

    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(3);
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;
}

The error is gone. However, I think I should be able to autoconfigure this with the spring.kafka.listener.* properties, as the documentation suggests.

If I cannot, I would like to use an autowired KafkaProperties. However, to be able to use it, I am adding:

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-autoconfigure -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-autoconfigure</artifactId>
    <version>1.5.2.RELEASE</version>
</dependency>

Then it is available to import. When I try to use it as following:

@Autowired
private KafkaProperties kafkaProperties;

and in my method:

 return kafkaProperties.buildConsumerProperties();

I am getting the following error:

Caused by: java.lang.ClassNotFoundException: org.springframework.boot.context.annotation.DeterminableImports
.

which is a Maven dependency problem, I assume.

So my questions are:

  1. Is it possible to configure a Kafka configuration without creating @Beans but only with application.properties?
  2. If not, how can I skip manually creating the required Map object as above but simply use kafkaProperties.buildConsumerProperties() without getting the above error(2nd)?
Gary Russell
  • 166,535
  • 14
  • 146
  • 179
Hasan Can Saral
  • 2,950
  • 5
  • 43
  • 78

3 Answers3

2

Go to http://start.spring.io and select Kafka; he will build you a project skeleton all set to go

Pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>kafkademo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>kafkademo</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>

props...

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=myGroup

App...

@SpringBootApplication
public class KafkademoApplication {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(KafkademoApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @KafkaListener(topics="so8400in")
    public void listen(String in) {
        System.out.println(in);
    }

}

EDIT

I changed the listener to

@KafkaListener(topics="so8400in")
public void listen(String in) {
    System.out.println("Received message:" + in);
}

Then sent this message...

$ kafka-console-producer --broker-list localhost:9092 --topic so8400in
test message for so42703487

...and on the console...

Received message:test message for so42703487

I can post the project on GitHub if you need further proof.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • This is exactly what I mean, this configuration as is won't work, with the above mentioned exceptions. – Hasan Can Saral Mar 09 '17 at 20:08
  • It works fine for me with the pom in my answer. No `@Bean`s needed, as you can see. – Gary Russell Mar 09 '17 at 20:11
  • Could you try with `@EnableKafka` annotation? I believe you cannot receive messages with the above application. – Hasan Can Saral Mar 10 '17 at 07:20
  • Boot adds `@EnableKafka` during auto configuration. See [here](https://github.com/spring-projects/spring-boot/blob/master/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java#L62) - I can assure you the above works (I tested it when I wrote it). – Gary Russell Mar 10 '17 at 13:14
  • Please see my answer below. I understand it works, but it's not the answer to my problem. I appreciate your help regardless. – Hasan Can Saral Mar 10 '17 at 13:15
0

It turned out to be a Maven issue, as I suspected. Basically, I work on a multi module project with the following structure:

────parent
    ├───parent.pom
    ├───module1
    |   └───module1.pom
    └───module2
        └───module2.pom

and my parent.pom had another parent element which was:

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-parent -->
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.2.RELEASE</version>
</parent>

Basically replacing the above parent of parent with:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <type>pom</type>
            <version>1.5.2.RELEASE</version>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

as suggested here resolved all of the issues (Both autoconfiguration and being able to use KafkaProperties).

Community
  • 1
  • 1
Hasan Can Saral
  • 2,950
  • 5
  • 43
  • 78
0

Starting from Spring Boot 2.7.0, the properties for Kafka consumers and producers have been restructured. Instead of using

spring.kafka.consumer.bootstrap-servers

spring.kafka.producer.bootstrap-servers

, you need to use

spring.kafka.bootstrap-servers

for both consumers and producers.

  1. Open the application.properties file in your Spring Boot project.

  2. Add the following property to configure the Kafka bootstrap servers:

    spring.kafka.bootstrap-servers=localhost:9092

Madhvesh S
  • 139
  • 12