9

We have an ActiveMQ broker that's connected to from very different clients using JMS, AMQP, and MQTT. For some reason we haven't figured out yet a specific set of MQTT clients often (not always) subscribes durably. This is a test environment where clients are added and removed quite often, the latter sometimes by pulling the plug or rebooting an embedded device, so that they cannot properly unsubscribe. The effect (IIUC) is that the broker piles up "offline durable subscription" for devices which it might never see again (I can see these under http://my_broker:8161/admin/subscribers.jsp), keeping messages on those topics forever, until it finally breaks down under its own memory footprint.

The issue at hand here is that the subscribers subscribe durably, and we need to find out why that's the case. However, it was also decided that clients doing this (unwittingly) shouldn't bring the broker to a grinding halt, so we need to solve this problem independently.

I have found there are settings for a timeout for offline durable subscriptions and put those into our broker configuration (last two lines):

<broker
  xmlns="http://activemq.apache.org/schema/core" 
  brokerName="my_broker"
  dataDirectory="${activemq.data}" 
  useJmx="true"
  advisorySupport="false" 
  persistent="false"
  offlineDurableSubscriberTimeout="1800000"
  offlineDurableSubscriberTaskSchedule="60000">

If I understand correctly, the above should check every minute and dismiss clients it hasn't seen for half an hour. However, contrary to the docs, this doesn't seem to work: A consumer I had subscribe and then pulled the plug on days ago is still visible in the list of offline durable subscribers, the broker's memory footprint is constantly increasing, and if I delete subscribers manually in the broker's web interface I can see the memory footprint going down.

So here's my questions:

  1. What determines whether a MQTT subscription to a topic on an ActiveMQ broker is durable?
  2. What am I doing wrong in setting up the timeout for dropping offline durably subscriptions in the ActiveMQ settings?
sbi
  • 219,715
  • 46
  • 258
  • 445
  • 1
    Did you try the other way around, by publishing messages with a short TTL period (time to live) and by configuring a short **expireMessagesPeriod**? According to docs, with this configuration, the system has to purge all such messages after expiriy of TTL-period, immaterial of long lost durable subscribers (who didn't unsubscribe). That should also help us in releasing memory resources, because the actual memory consumed is in storing "messages", and not for storing the subscribers object themselves. – blackpen Nov 19 '16 at 02:47

1 Answers1

3

I extracted the relevant code (doCleanup()) that removes timed out durable subscriptions.

In success case, it executes:

    LOG.info("Destroying durable subscriber due to inactivity: {}", sub);

In failure case, it executes:

    LOG.error("Failed to remove inactive durable subscriber", e);

Look for above log line in your log file and match it with details that you observed using admin/subscribers.jsp viewer. If it doesn't print any of the lines, the subscriptions might be remaining active for some reason or you may have stumbled into a bug.

Also, could you try to remove the underscore (_) in broker name if you can? The manual talks about problems with underscores in broker names.

Code:

public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
   super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
   if (broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule() != -1 && broker.getBrokerService().getOfflineDurableSubscriberTimeout() != -1) {
      this.cleanupTimer = new Timer("ActiveMQ Durable Subscriber Cleanup Timer", true);
      this.cleanupTask = new TimerTask() {
         @Override
         public void run() {
            doCleanup();
         }
      };
      this.cleanupTimer.schedule(cleanupTask, broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule(),broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule());
   }
}

public void doCleanup() {
   long now = System.currentTimeMillis();
   for (Map.Entry<SubscriptionKey, DurableTopicSubscription> entry : durableSubscriptions.entrySet()) {
      DurableTopicSubscription sub = entry.getValue();
      if (!sub.isActive()) {
         long offline = sub.getOfflineTimestamp();
         if (offline != -1 && now - offline >= broker.getBrokerService().getOfflineDurableSubscriberTimeout()) {
            LOG.info("Destroying durable subscriber due to inactivity: {}", sub);
            try {
               RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
               info.setClientId(entry.getKey().getClientId());
               info.setSubscriptionName(entry.getKey().getSubscriptionName());
               ConnectionContext context = new ConnectionContext();
               context.setBroker(broker);
               context.setClientId(entry.getKey().getClientId());
               removeSubscription(context, info);
            } catch (Exception e) {
               LOG.error("Failed to remove inactive durable subscriber", e);
            }
         }
      }
   }
}

// The toString method for DurableTopicSubscription class
@Override
public synchronized String toString() {
    return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" + durableDestinations.size() + ", total=" + getSubscriptionStatistics().getEnqueues().getCount() + ", pending=" + getPendingQueueSize() + ", dispatched=" + getSubscriptionStatistics().getDispatched().getCount() + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension();
}
blackpen
  • 2,339
  • 13
  • 15
  • 1
    Thanks for answering this. Our professional support option turned up with two facts: _1)_ There's indeed a bug in ActiveMQ where it fails to drop offline durable subscribers. _2)_ In order to subscribe non-durably from MQTT you need to connect with `cleanSession` set to `true` and with a QoS < 1. – sbi Nov 23 '16 at 12:02