Trying out to combine two features of HiveMQ: shared subscriptions and persistent sessions.
If have created a very simple message producer. And a very simple consumer. When running multiple consumers, all consumers receive all messages.
After setting clearSession to 'false' for the consumers, when running a consumer, and restart the consumer, the consumer also receives the messages when it was not connect. Excellent.
Now combining it with the shared subscription feature. When only using shared subscription, and clearSession is 'true'. When running multiple consumer, a message is only received by a single consumer. It should be round-robin and that is also the case, but as soon as you stop a consumer the messages a no longer round-robin but one of the consumers gets significantly more messages then the other(s).
If I now enable persistent session again, clearSession is 'false', and start the shared subscription consumers, the consumers start to receive all messages again instead of the message is just delivered to one client.
What is the issue here? Is this a bug in HiveMQ? Can persistent session and shared subscription not be used together? That would really be a bummer.
UPDATE 15/2/2017 As @fraschbi suggested I cleared all data and retested the shared subscription with persistent session consumers again. It seems to work!
What is strange though, is that the missed messages are only received once the 1st consumer reconnects. All consumers have equal code, they're just started with different clientId arguments. See code below. My test sequence:
- started consumer1: all message go this this consumer.
- started consumer2: each consumer receives every other message.
- started consumer3: each consumer gets 1 in 3 messages.
- stopping consumer1: now consumer2 and 3 receive every other message. (don't know why I saw this unevenly distribution saw yesterday but maybe as @fraschbi mentioned because I was reusing clientId's and did not unsubscribe or properly disconnect)
- now stopping consumer2: all messages now received by consumer3.
- stopping consumer3: no messages received anymore.
- restarting consumer3: it continues with the first message the producer sends. It does not receive the lost messages.
- restart consumer2: messages are evenly distributed again.
- restart consumer1: THIS one now receives all lost messages and then continues to receive every 1 in 3 messages.
So my new question is: why does only the 1st consumer receive the lost messages?
Note: the trick here is still not to unsubscribe when stopping the client, because then the subscription/persistence setting is lost!
Producer.scala
object Producer extends App {
val topic = args(0)
val brokerUrl = "tcp://localhost:1883"
val clientId = UUID.randomUUID().toString
val client = new MqttClient(brokerUrl, clientId)
client.connect()
val theTopic = client.getTopic(topic)
var count = 0
sys.addShutdownHook {
println("Disconnecting client...")
client.disconnect()
println("Disconnected.")
}
while(true) {
val msg = new MqttMessage(s"Message: $count".getBytes())
theTopic.publish(msg)
println(s"Published: $msg")
Thread.sleep(1000)
count = count + 1
}
}
Consumer.scala
object Consumer extends App {
val topic = args(0)
val brokerUrl = "tcp://localhost:1883"
val clientId = args(1)
// val clientId = UUID.randomUUID().toString
val client = new MqttClient(brokerUrl, clientId)
client.setCallback(new MqttCallback {
override def deliveryComplete(token: IMqttDeliveryToken) = ()
override def messageArrived(topic: String, message: MqttMessage) = println(s"received on topic '$topic': ${new String(message.getPayload)}")
override def connectionLost(cause: Throwable) = println("Connection lost")
})
println(s"Start $clientId consuming from topic: $topic")
val options = new MqttConnectOptions()
options.setCleanSession(false);
client.connect(options)
client.subscribe(topic)
sys.addShutdownHook {
println("Disconnecting client...")
// client.unsubscribe(topic)
client.disconnect()
println("Disconnected.")
}
while(true) {
}
}