3

We are using Spring message-driven-channel-adapter to subscribe MQTT topic. But we are getting below error very frequently. I have tested connection using JavaScript client(mqttws31.js), which is working fine. Means there is no issue with connection.

Error :-

org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter connectionLost
SEVERE: Lost connection:Connection lost; retrying...

MQTT message :-

[payload=6483D03E4C75BA943148F18D73,1.00,1E, headers={mqtt_retained=false, mqtt_qos=0, 
id=5fa41168-34c6-1e3d-a775-e3146842990a, mqtt_topic=TEST/GATEWAY2, mqtt_duplicate=false, timestamp=1499067757559}]

Configuration :-

<bean id="clientFactory"
    class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
    <property name="userName" value="${mqtt.username}" />
    <property name="password" value="${mqtt.password}" />
</bean>

<int-mqtt:message-driven-channel-adapter
    id="mqttInbound" client-id="${mqtt.default.client.id}" url="${mqtt.url}"
    topics="${topics}" client-factory="clientFactory" auto-startup="true"
    channel="output" error-channel="errorChannel" />


<int:channel id="output" />
<int:channel id="errorChannel" />

<int:service-activator input-channel="errorChannel"
    ref="errorMessageLogger" method="logError" />
<bean id="errorMessageLogger" class="com.mqtt.ErrorMessageLogger" />

<int:service-activator input-channel="output"
    method="handleMessage" ref="mqttLogger" />
<bean id="mqttLogger" class="com.mqtt.MqttReciever" />

pom.xml :

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.1.1</version>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>4.2.2.RELEASE</version>
</dependency>

While debugging org.eclipse.paho.client.mqttv3-1.1.1-sources.jar:-

CommsReceiver.Java

public void run() {
        final String methodName = "run";
        MqttToken token = null;

        while (running && (in != null)) {
            try {
                //@TRACE 852=network read message
                log.fine(CLASS_NAME,methodName,"852");
                receiving = in.available() > 0;
                MqttWireMessage message = in.readMqttWireMessage();
                receiving = false;

                // instanceof checks if message is null
                if (message instanceof MqttAck) {
                    token = tokenStore.getToken(message);
                    if (token!=null) {
                        synchronized (token) {
                            // Ensure the notify processing is done under a lock on the token
                            // This ensures that the send processing can complete  before the
                            // receive processing starts! ( request and ack and ack processing
                            // can occur before request processing is complete if not!
                            clientState.notifyReceivedAck((MqttAck)message);
                        }
                    } else if(message instanceof MqttPubRec || message instanceof MqttPubComp || message instanceof MqttPubAck) {
                        //This is an ack for a message we no longer have a ticket for.
                        //This probably means we already received this message and it's being send again
                        //because of timeouts, crashes, disconnects, restarts etc.
                        //It should be safe to ignore these unexpected messages.
                        log.fine(CLASS_NAME, methodName, "857");
                    } else {
                        // It its an ack and there is no token then something is not right.
                        // An ack should always have a token assoicated with it.
                        throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR);
                    }
                } else {
                    if (message != null) {
                        // A new message has arrived
                        clientState.notifyReceivedMsg(message);
                    }
                }
            }
            catch (MqttException ex) {
                //@TRACE 856=Stopping, MQttException
                log.fine(CLASS_NAME,methodName,"856",null,ex);
                running = false;
                // Token maybe null but that is handled in shutdown
                clientComms.shutdownConnection(token, ex);
            }
            catch (IOException ioe) {
                //@TRACE 853=Stopping due to IOException
                log.fine(CLASS_NAME,methodName,"853");

                running = false;
                // An EOFException could be raised if the broker processes the
                // DISCONNECT and ends the socket before we complete. As such,
                // only shutdown the connection if we're not already shutting down.
                if (!clientComms.isDisconnecting()) {
                    clientComms.shutdownConnection(token, new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ioe));
                }
            }
            finally {
                receiving = false;
            }
        }

        //@TRACE 854=<
        log.fine(CLASS_NAME,methodName,"854");
    }

In above method, sometime in.readMqttWireMessage() throw IOException. So based on catch block it reconnect using clientComms.shutdownConnection(token, ...

HybrisHelp
  • 5,518
  • 2
  • 27
  • 65
  • 1
    Your question is not clear. If the connection was lost, it means there was, er, a problem with the connection, perhaps due to a network error. The adapter will attempt to reconnect. – Gary Russell Jul 03 '17 at 14:28
  • Please find updated question. I have tested connection with JavaScript client, which is working fine. – HybrisHelp Jul 04 '17 at 05:33

3 Answers3

8

Just wanted to share in case it helps... I had the same exception and fixed it by ensuring a unique client ID was generated (with MqttAsyncClient.generateClientId()), as mentioned here: https://github.com/eclipse/paho.mqtt.java/issues/207#issuecomment-338246879

wrapperapps
  • 937
  • 2
  • 18
  • 30
3

But you are still not really describing a problem. You show a message above so it must be working for you. Paho is detecting a connection problem; it informs Spring Integration which will reconnect.

You can get complete information about the exception by adding an ApplicationListener to your application.

@Bean
public ApplicationListener<?> eventListener() {
    return new ApplicationListener<MqttConnectionFailedEvent>() {

        @Override
        public void onApplicationEvent(MqttConnectionFailedEvent event) {
            event.getCause().printStackTrace();
        }

    };
}

Result:

Connection lost (32109) - java.io.EOFException
    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:164)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
    at java.io.DataInputStream.readByte(DataInputStream.java:267)
    at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92)
    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:116)
    ... 1 more

(when I shut down the broker).

If you think there's a problem with the paho client, you should raise an issue for that project.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • `But you are still not really describing a problem` - Problem was frequently "Connection lost; retrying...", which I was not expecting. But now its working fine. I have used latest mqtt(4.3.10.RELEASE) and did maven clean/install. Now it is working as expected. There is no connection lost error. Thanks for you time. – HybrisHelp Jul 05 '17 at 08:12
0

MqttAsyncClient.generateClientId() Should be added in the Message Producer and MessageHandler method

public MessageProducer inbound() {
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(MqttAsyncClient.generateClientId(),
            mqttClientFactory(), "#");

    adapter.setCompletionTimeout(5000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(2);
    adapter.setOutputChannel(mqttInputChannel());
    return adapter;
}


@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
    MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(MqttAsyncClient.generateClientId(), mqttClientFactory());
    messageHandler.setAsync(true);
    messageHandler.setDefaultTopic("#");
    messageHandler.setDefaultRetained(false);
    return messageHandler;
}