It is very important to understand that you need to call client.setCallback and implement MqttCallbackHandler in order to receive messages on the topics to which you subscribed.
Here is an example of code that can both publish and subscribe etc.
The following code initially publishes the mqtt topic and payload:
- Topic: AndroidPhone
- Payload: Hello, I am an Android Mqtt Client.
The code subscribes to the topic "tester". If it receives a message with topic "tester" and payload of "Alarm Activated" then it publishes the following topic and payload (via the callback mentioned above):
- Topic: Fitlet
- Payload: Hello, the Mosquitto Broker got your message saying that the
Alarm is Activated.
If you are using Mosquitto then the following command in the terminal would cause this message to be fired out:
mosquitto_pub -h 192.168.9.100 -t tester -m "Alarm Activated" -u fred -P 1234
Where my Mosquitto username is fred and my password is 1234
The Code:
package colin.android.mqtt;
import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
import android.widget.Toast;
import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import java.io.UnsupportedEncodingException;
public class MainActivity extends AppCompatActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
String clientId = MqttClient.generateClientId();
//The URL of the Mosquitto Broker is 192.168.9.100:1883
final MqttAndroidClient client = new MqttAndroidClient(this.getApplicationContext(), "tcp://192.168.9.100:1883", clientId);
client.setCallback(new MqttCallbackHandler(client));//This is here for when a message is received
MqttConnectOptions options = new MqttConnectOptions();
try {
options.setUserName("fred");
options.setPassword("1234".toCharArray());
IMqttToken token = client.connect(options);
token.setActionCallback(new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
// We are connected
Log.d("mqtt", "onSuccess");
//-----------------------------------------------------------------------------------------------
//PUBLISH THE MESSAGE
MqttMessage message = new MqttMessage("Hello, I am an Android Mqtt Client.".getBytes());
message.setQos(2);
message.setRetained(false);
String topic = "AndroidPhone";
try {
client.publish(topic, message);
Log.i("mqtt", "Message published");
// client.disconnect();
//Log.i("mqtt", "client disconnected");
} catch (MqttPersistenceException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//-----------------------------------------------------------------------------------------------
String subtopic = "tester";
int qos = 1;
try {
IMqttToken subToken = client.subscribe(subtopic, qos);
subToken.setActionCallback(new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
// The message was published
Log.i("mqtt", "subscription success");
}
@Override
public void onFailure(IMqttToken asyncActionToken,
Throwable exception) {
// The subscription could not be performed, maybe the user was not
// authorized to subscribe on the specified topic e.g. using wildcards
Log.i("mqtt", "subscription failed");
}
});
} catch (MqttException e) {
e.printStackTrace();
}
//---------------------------------------------------------------------------
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
// Something went wrong e.g. connection timeout or firewall problems
Log.d("mqtt", "onFailure");
}
});
} catch (MqttException e) {
e.printStackTrace();
}
}
}//End of Activity class
//-----------------------------------------------------------------------------
class MqttCallbackHandler implements MqttCallbackExtended {
private final MqttAndroidClient client;
public MqttCallbackHandler (MqttAndroidClient client)
{
this.client=client;
}
@Override
public void connectComplete(boolean b, String s) {
Log.w("mqtt", s);
}
@Override
public void connectionLost(Throwable throwable) {
}
public void AlarmActivatedMessageReceived()
{
MqttMessage msg= new MqttMessage("Hello, the Mosquitto Broker got your message saying that the Alarm is Activated.".getBytes());
try {
this.client.publish("Fitlet", msg);
Log.i("mqtt", "Message published");
} catch (MqttPersistenceException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
Log.w("mqtt", mqttMessage.toString());
if (mqttMessage.toString().contains("Alarm Activated"))
{
AlarmActivatedMessageReceived();
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}