1

Update

In the source code of MQTTAdapterListener of carbon-analytics-common project as this link

The run() will sleep the thread and I believe that is the reason why the MQTT connection will take so long.

@Override
public void run() {
    while (!connectionSucceeded) {
        try {
            MQTTEventAdapterConstants.initialReconnectDuration = MQTTEventAdapterConstants.initialReconnectDuration
                    * MQTTEventAdapterConstants.reconnectionProgressionFactor;
            Thread.sleep(MQTTEventAdapterConstants.initialReconnectDuration);
            startListener();
            connectionSucceeded = true;
            log.info("MQTT Connection successful");
        } catch (InterruptedException e) {
            log.error("Interruption occurred while waiting for reconnection", e);
        } catch (MqttException e) {
            log.error("MQTT Exception occurred when starting listener", e);

        }

    }
}

The initialReconnectDuration and reconnectionProgressionFactor are as below in MQTTEventAdapterConstants

public static int initialReconnectDuration = 10000;
public static final int reconnectionProgressionFactor = 2;

If I have 12 receiver with MQTT, the 12th one will sleeping 40960 seconds. It seems that there is no way to modify this two constants? Are there have any way can solve this issue? Why the MQTT connection will set to sleep thread in this way?


We have using WSO2 CEP version 4.0.0 and ActiveMQ version 5.13.0. The deployed event receiver is as below

<?xml version="1.0" encoding="UTF-8"?>
<eventReceiver name="5718a6b1851cb3474c6f03c2_PROBE_DATA_RECEIVER"
    statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
    <from eventAdapterType="mqtt">
        <property name="clientId">5718a6b1851cb3474c6f03c2</property>
        <property name="topic">PROBE_DATA_571844f5851cb3474c6f0391_57184581851cb3474c6f0394_Topic</property>
        <property name="cleanSession">false</property>
        <property name="url">tcp://127.0.0.1:1883</property>
    </from>
    <mapping customMapping="enable" type="json">
        <property>
            <from jsonPath="$.event.payloadData.dyna.Speed3"/>
            <to name="speed" type="double"/>
        </property>
    </mapping>
    <to streamName="5718a6b1851cb3474c6f03c2_PROBE_DATA_IS" version="1.0.0"/>
</eventReceiver>

The clientId is a random object Id of this event execution plan in our database internal. The topic is existed in the ActiveMQ and I am sure that the message has been enqueued into it. The clean session property has set to false for durable subscription.

But, the receiver always take lots of time to connect to the ActiveMQ TOPIC, whatever after we restarted WSO2 CEP or deploying a new receiver.

[2016-04-24 01:07:59,018]  INFO {org.wso2.carbon.event.processor.manager.core.internal.CarbonEventManagementService} -  Starting polling event receivers
[2016-04-24 01:07:59,019]  INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} -  Connecting receiver 56f66bcc7a22442328cd229a_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,020]  INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} -  Connecting receiver 5718a6b1851cb3474c6f03c2_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,021]  INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} -  Connecting receiver 57144fa0851cb33590672418_X_DATA_RECEIVER
[2016-04-24 01:07:59,022]  INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} -  Connecting receiver 5714b5f4851cb3338c26a1cc_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,022]  INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} -  Connecting receiver 571b9ef4851cb356e04ec90b_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,023]  INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} -  Connecting receiver 5714a305851cb3338c26a1c4_DELIVERY_ORDER_RECEI
VER
[2016-04-24 01:07:59,023]  INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} -  Connecting receiver 56f6b03e7a22440c30855015_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,024]  INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} -  Connecting receiver 56f665f27a22442328cd2299_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,024]  INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} -  Connecting receiver 5718a5d7851cb3474c6f03bf_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,025]  INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} -  Connecting receiver 56ef98137a22442e28360c0b_DELIVERY_ORDER_RECEI
VER
[2016-04-24 01:07:59,026]  INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} -  Connecting receiver 571747f2851cb3338c26a1da_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,026]  INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} -  Connecting receiver 56f66c657a22442328cd229c_PROBE_DATA_RECEIVER
[2016-04-24 01:08:19,044]  INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} -  MQTT Connection successful
[2016-04-24 01:08:39,041]  INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} -  MQTT Connection successful
[2016-04-24 01:09:19,034]  INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} -  MQTT Connection successful
[2016-04-24 01:10:39,037]  INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} -  MQTT Connection successful
[2016-04-24 01:13:19,046]  INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} -  MQTT Connection successful
[2016-04-24 01:18:39,035]  INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} -  MQTT Connection successful
[2016-04-24 01:29:19,038]  INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} -  MQTT Connection successful
[2016-04-24 01:50:39,036]  INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} -  MQTT Connection successful
[2016-04-24 02:33:19,039]  INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} -  MQTT Connection successful
[2016-04-24 03:58:39,043]  INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} -  MQTT Connection successful
[2016-04-24 06:49:19,038]  INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} -  MQTT Connection successful
[2016-04-24 12:30:39,040]  INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} -  MQTT Connection successful

There are 12 receivers, WSO2 start to polling at 01:07:59. After about 10 mins, only 6 of 12 receivers has connected successful, the others will take about 10 or more hours.

Does anybody know why the MQTT of WSO2 CEP receiver will connecting so slow?

Community
  • 1
  • 1
Bruce
  • 647
  • 2
  • 12
  • 30

1 Answers1

1

There's no way provided out-of-the box to configure the reconnection duration and progressive factor values. But since you've already figured out the code, you can patch that component (either read those values from a config file or configure it through adaptor ui or just hard-code values) and apply the patch as described here to overcome this limitation.

Community
  • 1
  • 1
Rajeev Sampath
  • 2,739
  • 1
  • 16
  • 19
  • Hi Rajeev, I really appreciated for your wonderful idea and answer. I have check the WSO2CEP which I used is version 4.0.0 and, the MQTT in its repository\components\plugins is org.wso2.carbon.event.input.adapter.mqtt_5.0.3.jar. So I checked out the source code of the tag v5.0.3 of carbon-analytics-common project, modified the code and maven build the org.wso2.carbon.event.input.adapter.mqtt module. Finally, I have got a org.wso2.carbon.event.input.adapter.mqtt-5.0.3.jar! – Bruce Apr 26 '16 at 06:52
  • But, I still can not figure out how to patch the WSO2CEP. The patch of WSO2 is just simply a .jar file and put it into the /repository/components/patches/ ? I have done that and restart the WSO2CEP server but it not works whether with -DapplyPatches arugment. – Bruce Apr 26 '16 at 06:55
  • BTW, due to the file name that I have built is org.wso2.carbon.event.input.adapter.mqtt-5.0.3.jar and the one in WSO2CEP plugins is org.wso2.carbon.event.input.adapter.mqtt_5.0.3.jar. So I have renamed my built jar to org.wso2.carbon.event.input.adapter.mqtt_5.0.3.jar and put it into patches folder. – Bruce Apr 26 '16 at 07:23
  • you'll need to rename the jar file similar to the jar in plugins folder. also create a folder something like patch0999 in /repository/component/patches and place your jar inside that folder. then restart the server and it will automatically get applied – Rajeev Sampath Apr 26 '16 at 08:16
  • if you want to check whether it's properly applied check the md5sum of the jar in patches/patch0999 vs the jar in plugins after restarting the server. – Rajeev Sampath Apr 26 '16 at 08:18
  • Thanks for the answer, I have tried to rename the jar and put it into patch0001 folder, it works perfectly! I also found that WSO2 will just replace the jar into plugins folder and backup the original one to patch0000, so if I just replace the jar into plugins folder, it also works in the right way. Thanks again. – Bruce Apr 26 '16 at 09:49