1

Config

@Configuration
@IntegrationComponentScan
public class MqttV5ChannelConfig {
    @Bean
    public MqttConnectionOptions getMqttConnectionOptions() throws Exception {
        MqttConnectionOptions options = new MqttConnectionOptions();
        options.setUserName(xxx);
        options.setPassword(xxx);
        options.setServerURIs(xxx);
        options.setAutomaticReconnect(true);
        options.setCleanStart(false);
    }
    @Bean
    public MessageProducer inbound(MqttConnectionOptions options) {
        Mqttv5PahoMessageDrivenChannelAdapter adapter = new Mqttv5PahoMessageDrivenChannelAdapter(options, "Client_" + System.currentTimeMillis(), "test");
        adapter.setCompletionTimeout(5000);
        adapter.setQos(0);
        adapter.setOutputChannel(mqttInboundChannel());
        adapter.setErrorChannel(errorChannel());
        return adapter;
    }
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound(MqttConnectionOptions options) {
        Mqttv5PahoMessageHandler handler = new Mqttv5PahoMessageHandler(options, "Client_" + System.currentTimeMillis());
        handler.setAsync(true);
        handler.setDefaultTopic("test");
        return handler;
    }
    @Bean
    public MessageChannel mqttOutboundChannel(){
        return new DirectChannel();
    }
    @Bean
    public MessageChannel mqttInboundChannel(){
        return new DirectChannel();
    }
    @Bean
    public MessageChannel errorChannel(){
        return new DirectChannel();
    }
}

Error log

org.springframework.context.ApplicationContextException: Failed to start bean 'mqttV5ChannelConfig.mqttOutbound.serviceActivator'; nested exception is java.lang.IllegalStateException: Cannot connect 'MqttAsyncClient' for: mqttOutbound
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181)
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54)
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356)

Question

I want the startup will not fail when the network is disconnected. It will automatically reconnect when the network is connected or when I push a message.

How can i configuration?

noodles
  • 21
  • 1

1 Answers1

0

In the inbound bean, adapter.setAutoStartup(false) will prevent the context from starting the adapter during initialization.

You can then start() the adapter manually (in a try/catch block, presumably).

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • 1
    I think we have a bug over there in the `Mqttv5PahoMessageHandler`: we must not re-throw an exception - just event and log message. The rest of logic should rely on the `options.setAutomaticReconnect(true)` which is done internally on the background. I will into that today together with https://github.com/spring-projects/spring-integration/issues/3697 – Artem Bilan Dec 16 '21 at 14:39
  • The fix is here: https://github.com/spring-projects/spring-integration/pull/3698 – Artem Bilan Dec 16 '21 at 17:23
  • That's great. Thanks. – noodles Dec 23 '21 at 01:57