2

I am trying to consume a kafka topic from spring boot application. I am using Spring cloud stream with below mentioned version

  • Spring-boot-starter-parent: 2.5.7
  • Spring cloud version: 2020.0.4

Below are the code and configuration

application.yml

spring:
  zipkin:
    sender:
      type: kafka
  kafka:
    bootstrap-servers:
    - localhost:19091
  cloud:
    stream:
      bindings:
        audit-in-0:
          destination: com.tonitingaurav.kafka.log
          group: kafka-log-group
          consumer:
            concurrency: 10
            max-attempts: 3
      default-binder: kafka
      kafka:
        binder:
          brokers:
          - localhost:19091

Message Consumer Class

@Configuration
public class LogConsumer {

    @Bean
    Consumer<Log> audit(){
        return log -> {
            System.out.println(log.getMessage());
        };
    }
}

Below message publisher is publishing the messages properly. Publisher is written in different micro service.

@Component
public class LogEventPublisher {

    @Autowired
    @Qualifier(LogProducerKafkaConfig.KAFKA_LOG_PUBLISHER)
    MessageChannel messageChannel;

    public void logMessage(Log log) {
        Message<Log> message = MessageBuilder.withPayload(log).build();
        messageChannel.send(message);
    }

}

pom.xml

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
user3082820
  • 91
  • 2
  • 8

1 Answers1

-1

You already posted a very similar question here and the response with two different solutions was provided. Also, here are the samples you can use as starting point - https://github.com/spring-cloud/spring-cloud-stream-samples

Oleg Zhurakousky
  • 5,820
  • 16
  • 17
  • This question is different as it is related to the consumer. Can you please tell me the issue with the attached code. https://github.com/spring-cloud/spring-cloud-stream-samples is a ocean of java classes difficult to learn from this link – user3082820 Nov 29 '21 at 10:08
  • Your initial post doesn't even describe the problem. What does "not working with Kafka" even means? Also, the code above is incomplete nor it represents what the typical s-c-stream application looks like, so what guide, documentation or sample did you follow to create it in the first place? Yes there are many samples specifically to fit as many cases as possible. The one that fits both of your cases is `processor-samples/uppercase-transformer`. If you want us to look at something, please create a complete sample with bare minimum that reproduces the issue, push it to github and post a link – Oleg Zhurakousky Nov 29 '21 at 10:47
  • I want just to consume the messages placed on a kafka topic and consumption is not being done. Please refer https://github.com/tonitingaurav/micro-service-demo/tree/master/event-handler code base. It has a parent MVN project https://github.com/tonitingaurav/micro-service-demo/tree/master/micro-service-demo . I am very new to the spring cloud stream and refering https://stackoverflow.com/questions/65978055/enablebinding-output-input-deprecated-since-version-of-3-1-of-spring-cloud-str to create a kafka consumer. – user3082820 Nov 29 '21 at 12:49
  • ``` spring: cloud: stream: function: definition: transform;sendTestData;receive ``` The only difference in suggested processor-samples/uppercase-transformer is that I have not done above configuration. I tried this but it seems that spring.cloud.stream.function.definition is not available in latest version – user3082820 Nov 29 '21 at 12:57
  • Could you try `spring.cloud.function.definition`? If it still doesn't work for you, I suggest, you create a very minimal sample (without any additional components) that contains a simple consumer and the configuration for that. That way, it becomes very easy to reproduce. With your current repository, it is hard to see where the potential issue is. – sobychacko Nov 29 '21 at 20:09
  • I tried spring.cloud.function.definition too. But still cosumer is not getting the messages. – user3082820 Dec 01 '21 at 11:26