0

I am trying to using Spring MQTT Integration to build a client that is subscribe to MQTT broker. The code works as expected, no issues. I am struggling configuring it so that when the connection is lost, it subscribes automatically. What is happening now, is that when it disconnects, the connection is established but no is not subscribed anymore to my topic.

What should I do to capture the event correctly, and resubscribe again when connection is lost?

Here is my configuration


import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.ConsumerStopAction;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

@Configuration
public class MqttBeans {

    Logger logger = LoggerFactory.getLogger(MqttBeans.class);
    
    
    @Bean
    public MqttConnectOptions mqttConnectOptions() {
        
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { "ssl://URL:8883" });
        options.setUserName("ubidot_bridge");
        String pass = "PASS";
        options.setPassword(pass.toCharArray());
        options.setCleanSession(false);
        options.setAutomaticReconnect(true);
        
        options.setConnectionTimeout(30);
        options.setKeepAliveInterval(90);
        options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
        
        return options;
    }
    

    @Bean
    public MqttPahoClientFactory mqttClientFactory(MqttConnectOptions options) {

        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        
        factory.setConnectionOptions( options );
        factory.setConsumerStopAction(ConsumerStopAction.UNSUBSCRIBE_NEVER);
        logger.info("Reconnected to the broker");
        
        return factory;
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }
    
    @Bean
    public MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapterConfig(MqttConnectOptions options) {
        
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("ubidot_bridge_in",
                mqttClientFactory(options), "#");

        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(2);
        adapter.setOutputChannel(mqttInputChannel());
        logger.info("Setting up inbound channel");
        return adapter;
    }
    
    @Bean
    public MessageProducer inbound(MqttPahoMessageDrivenChannelAdapter adapter) {
        return adapter;
    }
    

    
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {

        logger.info("Setting up msg receiver handler");

        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {

                String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();

                logger.info("Msg received .. Topic: " + topic);
                logger.info("Payload " + message.getPayload());

                System.out.println();
            }

        };
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound( MqttConnectOptions options ) {

        // clientId is generated using a random number
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("ubidot_bridge_out", mqttClientFactory(options));
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("#");
        messageHandler.setDefaultRetained(false);
        return messageHandler;
    }
}

Thank you in advance for the help

T.

tjxob
  • 19
  • 6
  • If you are going to set clean session to false you will need to set a client if as well – hardillb Feb 14 '23 at 07:23
  • Could you please elaborate more? Thnks – tjxob Feb 14 '23 at 17:06
  • Typo it I meant `client id`. For clean session false to be useful you need to ensure the same client id is used for every time it connects otherwise you just leak sessions. – hardillb Feb 14 '23 at 17:12
  • Thank you @hardillb. I am using the same client_id, as it shows in the code. What I am noticing is that if the java code disconnects itself, it maintains the subscriptions. But if the I kick it out from the broker, it does not maintain the subscriptions. As per your experience, is there anything I can do to resubscribe? – tjxob Feb 14 '23 at 17:34
  • Where are you setting the client id, I couldn't find it, hence the comment? – hardillb Feb 14 '23 at 18:04
  • Here: new MqttPahoMessageDrivenChannelAdapter("ubidot_bridge_in", mqttClientFactory(options), "#"). The client_id is: ubidot_bridge_in – tjxob Feb 14 '23 at 18:11

0 Answers0