19

I am trying to create the simplest as possible hello world with Spring Cloud + Kafka Streams + Spring Boot 2.

I realize I miss basic concepts. Basically, I understand that:

1 - I need to define an outbound stream to write messages to a Kafka topic, and an inbound stream to read messages from a Kafka topic

public interface LoansStreams {

    String INPUT = "loans-in";
    String OUTPUT = "loans-out";

    @Input(INPUT)
    SubscribableChannel inboundLoans();

    @Output(OUTPUT)
    MessageChannel outboundLoans();

}

2 - configure Spring Cloud Stream to bind to my streams

@EnableBinding(LoansStreams.class)
public class StreamsConfig {
}

3 - configure Kafka properties

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        loans-in:
          destination: loans
          contentType: application/json
        loans-out:
          destination: loans
          contentType: application/json

4 - create model for exchange messages

@Getter @Setter @ToString @Builder
public class Loans {
    private long timestamp;
    private String result;
}

5 - write to Kafka

@Service
@Slf4j
public class LoansService {
    private final LoansStreams loansStreams;
    public LoansService(LoansStreams loansStreams) {
        this.loansStreams = loansStreams;
    }
    public void sendLoan(final Loans loans) {
        log.info("Sending loans {}", loans);
        MessageChannel messageChannel = loansStreams.outboundLoans();
        messageChannel.send(MessageBuilder
                .withPayload(loans)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build());
    }
}

6 - listen to Kafka topic

@Component
@Slf4j
public class LoansListener {

    @StreamListener(LoansStreams.INPUT)
    public void handleLoans(@Payload Loans loans) {
        log.info("Received results: {}", loans);

    }
}

I spent a whole day reading few blogs and I assume that the above code is at least workable. I amo not sure I realy coding the best aproach as possible. By the way, I get the error mentioned in the topic:

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2019-04-26 18:33:05.619 ERROR 14784 --- [  restartedMain] o.s.boot.SpringApplication               : Application run failed

org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is java.lang.IllegalStateException: A default binder has been requested, but there are no binders available for 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' : , and no default binder has been set.

Googling for solution, I found someone saying to code StreamListe returning the model so I replaced it with:

@StreamListener(LoansStreams.INPUT)
@SendTo("loans-out")
public KStream<?, Loans> process(KStream<?, Loans> l) {
    log.info("Received: {}", l);
    return l;
}

and then I get an error even less clear at least to me (previous error clearly mentioned some binder issue):

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2019-04-26 19:01:06.016 ERROR 13276 --- [  restartedMain] o.s.boot.SpringApplication               : Application run failed

java.lang.IllegalArgumentException: Method must be declarative
        at org.springframework.util.Assert.isTrue(Assert.java:118) ~[spring-core-5.1.6.RELEASE.jar:5.1.6.RELEASE]
        at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.validateStreamListenerMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:510) ~[spring-cloud-stream-binder-kafka-streams-2.1.2.RELEASE.jar:2.1.2.RELEASE]
        at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.orchestrateStreamListenerSetupMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:168) ~[spring-cloud-stream-binder-kafka-streams-2.1.2.RELEASE.jar:2.1.2.RELEASE]
        at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.doPostProcess(StreamListenerAnnotationBeanPostProcessor.java:226) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]

In case it helps somehow, I want to evoluate this idea to apply SAGAS but it is not the focus of this question. Firstly, I need get the basic up and running.

*edited

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.mybank</groupId>
    <artifactId>kafka-cloud-stream</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-cloud-stream</name>
    <description>Spring Cloud Stream With Kafka</description>

    <properties>
        <java.version>11</java.version>
        <spring-cloud.version>Greenwich.SR1</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework/spring-web -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <!-- version>5.1.5.RELEASE</version-->
        </dependency>

    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
Jim C
  • 3,957
  • 25
  • 85
  • 162
  • 1
    You don't need to include `KStream` with your `StreamListener` since you don't have any `KStream` bindings. What binder you include in your build configuration? (maven/gradle etc.) How you include them? Can we see a sample config? Your steps 1 - 6 should work, might be missing something. A small sample app will help. – sobychacko Apr 26 '19 at 22:34
  • 1
    Yes, your interface is for the message channel binder, not the KStream binder. It's also a bit odd to read and write from/to the same topic in an app. You'll get an infinite loop. – Gary Russell Apr 27 '19 at 00:52

4 Answers4

45

"A default binder has been requested, but there are no binders available ...", please add dependency as below.

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>
todaynowork
  • 976
  • 8
  • 12
  • I have already spring-cloud-stream-binder-kafka-streams in my pom. I will add my whole pom above – Jim C Apr 29 '19 at 18:32
  • 6
    Please notice spring-cloud-stream-binder-kafka instead of spring-cloud-stream-binder-kafka-**streams** – todaynowork Apr 30 '19 at 00:58
  • 2
    And you could add both of them, spring-cloud-stream-binder-kafka is for kafka client api; while spring-cloud-stream-binder-kafka-streams is for kafka stream api. – todaynowork Apr 30 '19 at 01:17
  • Probably a very naive question, can you exemplify the difference between ...binder-kafka and ...binder-kafka-streams? I mean when I use one instread of another? I assume that ..binder-kafka-stream would offer all I get from binder-kafka plus streams feature. – Jim C Apr 30 '19 at 14:37
  • 1
    @jimC - you use ..binder-kafka-streams only when your application is a Kafka Streams based application. By that I mean, you are using the kafka streams library in your application and make use of either high level DSL or low level processor API provided by kafka streams. You use ..binder-kafka when you want to leverage on Spring Integration, Spring Cloud Function/Project Reactor based constructs. They both are very different binders under the hood. – sobychacko Apr 30 '19 at 16:58
  • I am still confused about when use one in favour of another. I am googling around without success. If you can write just one phrase when use one in favour of another it will be usefull. PS. I read https://stackoverflow.com/questions/46829362/what-is-the-difference-between-kafka-streams-dsl-to-processor-api bujt it doesn´t answer. In my case, I want to use Kafka to apply Daga pattern. In few words, imagine I have two microservices, LOAN and BANK-STATEMENTand I use SAGA pattern. I am bit confident that using kafka-streams be very usefull in that scenario. – Jim C Apr 30 '19 at 20:05
  • 1
    A short hint. Kakfa client is use to producer(Source) and consumer(Sink), transform(INPUT->OUTPUT), process(INPUT->OUTPUT) and bind to message channel, you can use Rabbit binder instead. And Kafka stream is advanced alternative for application process - join, branch, filter, map value and so forth. And bind to kafka topic directly. – todaynowork May 10 '19 at 07:56
  • Something worked here, thanks! Got java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.Consumer. – Sachin Sridhar Jul 02 '19 at 05:51
8

You can define your default binder in application.yml (or application.properties)

spring:
  cloud:
    stream:
      bindings:
        ...
      default-binder: kafka
fg78nc
  • 4,774
  • 3
  • 19
  • 32
3

For me, with different application.properties for different contexts and multiple output bindings, the only way I could fix it was defining a general default binding like:

spring:
  cloud:
    stream:
      default-binder: eventhub
      ...

And the rest of bindings type individually set in each input / output as well.

gcpdev
  • 442
  • 6
  • 20
0

In the above pom file, you need to use binder-kafka and not binder-kafka-streams

So replace

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>

With

         <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
Amrut Prabhu
  • 1,161
  • 11
  • 11