1

I am trying to send a message to rabbitmq with spring cloud stream on application startup. Using the sample code below.

public interface Barista {
    @Input
    SubscribableChannel orders();
}

SpringBoot app which enables bindings

@SpringBootApplication
@EnableBinding(Barista.class)
public class DemoSpringCloudStreamApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoSpringCloudStreamApplication.class, args);
    }
}

Application Runner that should just send a message on startup

@Component
public class Startup implements ApplicationRunner {

    @Autowired
    private Barista barista;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        Message<String> message = MessageBuilder.withPayload("test")
                                             .build();

        barista.orders().send(message);
    }

    //    @StreamListener("orders")
    //    public void handle(String message) {
    //        System.out.println(message);
    //    }
}

The code above produces the exception below unless I uncomment the commented out code above.

java.lang.IllegalStateException: Failed to execute ApplicationRunner
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:791) [spring-boot-2.0.2.RELEASE.jar:2.0.2.RELEASE]
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:778) [spring-boot-2.0.2.RELEASE.jar:2.0.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:335) [spring-boot-2.0.2.RELEASE.jar:2.0.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1255) [spring-boot-2.0.2.RELEASE.jar:2.0.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1243) [spring-boot-2.0.2.RELEASE.jar:2.0.2.RELEASE]
    at com.example.demospringcloudstream.DemoSpringCloudStreamApplication.main(DemoSpringCloudStreamApplication.java:12) [classes/:na]
Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.orders'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=test, headers={id=4170f931-b303-dc96-152b-19d5c3421fb3, contentType=application/json, timestamp=1528930565229}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) ~[spring-integration-core-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394) ~[spring-integration-core-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at com.example.demospringcloudstream.Startup.run(Startup.java:25) ~[classes/:na]
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:788) [spring-boot-2.0.2.RELEASE.jar:2.0.2.RELEASE]
    ... 5 common frames omitted
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:138) ~[spring-integration-core-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    ... 9 common frames omitted

I am using Spring Boot 2.0.2 and Sprig Cloud Stream 2.0.0 as shown in the pom below

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>demo-spring-cloud-stream</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>demo-spring-cloud-stream</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <spring-cloud.version>Finchley.RC2</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>


</project>

According to this accepted answer https://stackoverflow.com/a/42600330/438319 the 'ApplicationRunner' should work. However, I get org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers The app firing the events does not need to listen to the event.

So how do I fire an event with Spring Cloud Stream on application startup without having to listen to the event?

ams
  • 60,316
  • 68
  • 200
  • 288

2 Answers2

2

It actually tells you in the error message exactly what it means.

Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.orders'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=test, headers={id=4170f931-b303-dc96-152b-19d5c3421fb3, contentType=application/json, timestamp=1528930565229}]

You are sending a message to application.orders channel but there are no subscribers to that channel.

Oleg Zhurakousky
  • 5,820
  • 16
  • 17
  • Why does the app sending the message have to also subscribe to the channel it's sending the message on? I just want to fire a message on startup and have some one else pickit up in a different app. – ams Jun 13 '18 at 23:41
  • The channel is local so for other application to pick it up it has to work in conjunction with one of the binders (e.g., Kafka, Rabbit etc). Then there is Input/Output type channels and I don't see your full application configuration. In any event I am not clear as to what are you trying to accomplish so I would suggest for you to go through this very quick tutorial (5-10 min) https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_quick_start Then I'd assume you'll have a more pointed question – Oleg Zhurakousky Jun 14 '18 at 00:49
  • I have updated the question to clarify it. I am actually going through a deep dive on Spring Cloud Stream reading the manual and trying out every feature mentioned. This is how I ran into this issue, it seems that I should be able to publish a message with spring cloud stream without having to also listen to subscribe to the channel that I am publishing on. – ams Jun 14 '18 at 03:10
  • You should, but as I stated I only see what you have posted and based on that the error you're seeing makes perfect sense and is expected. Perhaps you can post your sample project on GitHub or somewhere where we can look and see what you really trying to accomplish. – Oleg Zhurakousky Jun 14 '18 at 12:44
  • https://github.com/amsabc/demo-spring-cloud-stream has a repo that reproduces the issues Thanks for your help here. I think this is a bug. – ams Jun 14 '18 at 14:50
  • As I said, you are sending a message to a SubscribableChannel to which there is no subscriber. Barista.input declared as `@Input` binding which absolutely must have a subscriber. If you want to send message out it has to be an `@Oututput` binding. I'll suggest once again, please go thru this quick start guide https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_quick_start. It will only take you 5 min. – Oleg Zhurakousky Jun 14 '18 at 20:45
0

You are sending the message to an input channel. That's why you are getting the error when you have no subscribers to the channel.

In order to send message from your application, try to define your channel as an output channel like:

public interface Barista {
  @Output
  MessageChannel orders();
}

If you actually want your channel as an input channel for any reason, you must add a subscriber.

Jordi Martínez
  • 418
  • 4
  • 11