1

I am trying to create a spring boot application using spring cloud kafka stream which reads input from kafka cluster 1 and send it to kafka cluster 2 using single kafkastream application.

I am getting following exception during startup.

***************************
APPLICATION FAILED TO START
***************************

Description:

A component required a bean named 'sessionBrokers-KafkaStreamsBinderConfigurationProperties' that could not be found.


Action:

Consider defining a bean named 'sessionBrokers-KafkaStreamsBinderConfigurationProperties' in your configuration.

My configuration is as below

spring.cloud.function.definition=itemConsumedStream

#Input Configuration
spring.cloud.stream.bindings.itemConsumedStream-in-0.binder=sessionBrokers
spring.cloud.stream.bindings.itemConsumedStream-in-0.destination=${session.event.data.input.topic}
spring.cloud.stream.kafka.bindings.itemConsumedStream-in-0.consumer.configuration.key.deserializer=org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.itemConsumedStream-in-0.consumer.configuration.value.deserializer=in.custom.JsonNodeDeserializer

#Output Configuration
spring.cloud.stream.bindings.itemConsumedStream-out-0.binder=searchBrokers
spring.cloud.stream.bindings.itemConsumedStream-out-0.destination=${itemConsumed.events.output.topic}
spring.cloud.stream.kafka.bindings.itemConsumedStream-out-0.producer.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.itemConsumedStream-out-0.producer.configuration.value.serializer=in.custom.JsonSerializer


#Input Binder Configurations
spring.cloud.stream.binders.sessionBrokers.type=kafka
spring.cloud.stream.binders.sessionBrokers.environment.spring.cloud.stream.kafka.streams.binder.brokers=${session.event.data.input.kafka.brokers}

#Output Binder Configurations
spring.cloud.stream.binders.searchBrokers.type=kafka
spring.cloud.stream.binders.searchBrokers.environment.spring.cloud.stream.kafka.binder.brokers=${search.kafka.brokers}

pom dependencies are spring boot version

2.7.1

spring cloud version

2021.0.0
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

This is working if I use single binder ie

spring.cloud.stream.kafka.streams.binder.brokers=${session.event.data.input.kafka.brokers}

instead of multiple binders

  • Why do you need both binders in the application? Do you have a use case that supports it? Can you create a simple sample application to reproduce this issue? – sobychacko Sep 09 '22 at 18:31
  • @sobychacko we have use case to filter session event from cluster 1 to cluster 2 based on usescase to solve. Please find sample app at https://github.com/ketanyad/kafka-streams-across-cluster – Ketan Yadav Sep 10 '22 at 04:59
  • I am only seeing a single Kafka Streams function in this application and there is no regular Kafka functions - therefore, I don't think you need to include the regular kafka-binder dependency. Also, remember that Kafka Streams does not support multiple clusters (i..e, you cannot receive data from one cluster and produce to another within a single processor). – sobychacko Sep 12 '22 at 14:03

0 Answers0