Spring Cloud Stream Dispatcher has no subscribers Error.
After a successful spring boot container start up we need to put few notification messages on a Kafka topic and several of our microservices does same function and for this reason we wrote a common jar that contains out put channel definitions and dispatch utils. And the functionality works as expected as long as we invoke the util right after the SpringApplication.run call.
Following is one of our microservices Application class sample.
@SpringBootApplication
public class Application {
public static void main(String[] args) {
ConfigurableApplicationContext context =SpringApplication.run(Application.class, args);
context.getBean(SchedulerConsumerUtils.class).registerOrRestartConsumerJobs();
}
}
The above set up works as expected, however this puts unnecessary burden the developer to write the boiler template code on every microservice. So to avoid this, we wrote an Aspect implementation to do the same function, however with our aspect approach we are running into the following error.
org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'schedulertestsvcs:dev:1180.scheduledJobExecutionResponseOutput'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
We tried several approaches like Spring SmartLifeCycle to get a handle on all Kafka Output/Input channel startup completion but all of them are running into the same error.
Following is our Aspect implementation on org.springframework.boot.SpringApplication.run(..)
@Aspect
@Component
public class SchedulerConsumerAspect {
@Autowired
protected ApplicationContext applicationContext;
@AfterReturning(value = "execution(* org.springframework.boot.SpringApplication.run(..))",returning = "result")
public void afterConsumerApplicationStartup(JoinPoint pjp, Object result) throws Throwable {
if(result!=null){
ConfigurableApplicationContext context=(ConfigurableApplicationContext) result;
if(context.containsBean("schedulerConsumerUtils")){
//For what ever reason the following call resulting in Dispatcher has no subscribers for channel error.
//TODO fix the above issue and enable the following call.
context.getBean(SchedulerConsumerUtils.class).registerOrRestartConsumerJobs();
}
}
}
}
During our debug sessions, we found out org.springframework.boot.SpringApplication.run(..) Aspect was called several times during the bootstrap process. First when the aspect was called we got result value as null, after some time spring boot calls the same aspect this time result is not null. Even after getting result not null there is no grantee the component is completely initialized that's why you see a check for context.containsBean("schedulerConsumerUtils"). However after the bean initialization we are seeing output channels are not completely bound.
What is the best way to get handle on Spring Cloud Stream Kafka output/input channel binding completion ?
Why the component invocation works fine in SpringBoot Application but not through Aspect? I struggled on this few days couldn't find the right solution. Any help greatly appreciated.