0

I have an application that connects to a pub/sub and processes messages when the pub/sub subscription publishes. I want to be able to put these messages into a Queue Channel to avoid processing loads of messages at once. However, when i try to add a queue channel i get the below error ? So the way i see it, a message arrives in the inboundChannelAdaptor, outputs the message to the Queue channel, then the messageReciever pulls and actions the messages in the QueueChannel ?

java.lang.IllegalArgumentException: No poller has been defined for Annotation-based endpoint, and no default poller is available within the context.
    at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.configurePollingEndpoint(AbstractMethodAnnotationPostProcessor.java:435) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.doCreateEndpoint(AbstractMethodAnnotationPostProcessor.java:377) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.createEndpoint(AbstractMethodAnnotationPostProcessor.java:367) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.postProcess(AbstractMethodAnnotationPostProcessor.java:172) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.postProcessMethodAndRegisterEndpointIfAny(MessagingAnnotationPostProcessor.java:230) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.lambda$processAnnotationTypeOnMethod$1(MessagingAnnotationPostProcessor.java:220) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) ~[na:na]
    at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.afterSingletonsInstantiated(MessagingAnnotationPostProcessor.java:141) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:914) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:879) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:551) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at Application.main(Application.java:65) ~[classes/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
    at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) ~[spring-boot-devtools-2.3.3.RELEASE.jar:2.3.3.RELEASE]

Here is my implementation:

@Bean
public MessageChannel inputMessageQueueChannel() {
    return new QueueChannel(50);
}

@Bean
public PubSubInboundChannelAdapter inboundChannelAdapter(
        @Qualifier("inputMessageQueueChannel") MessageChannel messageChannel,
        PubSubTemplate pubSubTemplate) {
    PubSubInboundChannelAdapter adapter =
            new PubSubInboundChannelAdapter(pubSubTemplate,'sub-one');
    adapter.setOutputChannel(messageChannel);
    adapter.setAckMode(AckMode.MANUAL);
    adapter.setPayloadType(String.class);
    return adapter;
}


@ServiceActivator(inputChannel = "inputMessageQueueChannel")
public void messageReceiver(
        String payload,
        @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
    log.info("Payload: " + payload);
    message.ack();
}

1 Answers1

0

The queue channel does nothing by itself unless it stores sent messages into a message store internally. To be able to receive messages from this queue you need to poll such a queue periodically. For this purpose the PollingConsumer pattern is used, but it cannot do anything by itself: you need to say it how to poll that queue. So, the poller must be provided. See the poller attribute of that @ServiceActivator annotation. Or you can provide a global default one via PollerMetadata bean definition with a PollerMetadata.DEFAULT_POLLER name. See more info in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#endpoint-pollingconsumer.

See also this question and its answers: Spring Integration No poller has been defined for endpoint

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • So essentially the serivceActivator will poll the queue channel for messages, while the queue channel is populated by the inboundChannelAdapter ? –  Sep 15 '21 at 11:47
  • 1
    Correct. The producer really doesn’t care about the channel type it sends messages to. The consumer behavior depends on the channel type. Anyway it is not clear why is the question since you have a correct config. Only what you miss is a poller – Artem Bilan Sep 15 '21 at 12:14