I am trying to using Spring MQTT Integration to build a client that is subscribe to MQTT broker. The code works as expected, no issues. I am struggling configuring it so that when the connection is lost, it subscribes automatically. What is happening now, is that when it disconnects, the connection is established but no is not subscribed anymore to my topic.
What should I do to capture the event correctly, and resubscribe again when connection is lost?
Here is my configuration
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.ConsumerStopAction;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
@Configuration
public class MqttBeans {
Logger logger = LoggerFactory.getLogger(MqttBeans.class);
@Bean
public MqttConnectOptions mqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { "ssl://URL:8883" });
options.setUserName("ubidot_bridge");
String pass = "PASS";
options.setPassword(pass.toCharArray());
options.setCleanSession(false);
options.setAutomaticReconnect(true);
options.setConnectionTimeout(30);
options.setKeepAliveInterval(90);
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
return options;
}
@Bean
public MqttPahoClientFactory mqttClientFactory(MqttConnectOptions options) {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions( options );
factory.setConsumerStopAction(ConsumerStopAction.UNSUBSCRIBE_NEVER);
logger.info("Reconnected to the broker");
return factory;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapterConfig(MqttConnectOptions options) {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("ubidot_bridge_in",
mqttClientFactory(options), "#");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(2);
adapter.setOutputChannel(mqttInputChannel());
logger.info("Setting up inbound channel");
return adapter;
}
@Bean
public MessageProducer inbound(MqttPahoMessageDrivenChannelAdapter adapter) {
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
logger.info("Setting up msg receiver handler");
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
logger.info("Msg received .. Topic: " + topic);
logger.info("Payload " + message.getPayload());
System.out.println();
}
};
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound( MqttConnectOptions options ) {
// clientId is generated using a random number
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("ubidot_bridge_out", mqttClientFactory(options));
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("#");
messageHandler.setDefaultRetained(false);
return messageHandler;
}
}
Thank you in advance for the help
T.