22

Is there a way to suppress duplicated messages on a queue defined on ActiveMQ server?

I tried to define manually JMSMessageID, (message.setJMSMessageID("uniqueid")), but server ignores this modification and deliver a message with built-in generated JMSMessageID.

By specification, I didn't found a reference about how to deduplicate messages.

In HornetQ, to deal with this problem, we need to declare the HQ specific property org.hornetq.core.message.impl.HDR_DUPLICATE_DETECTION_ID on message definition.

i.e.:

Message jmsMessage = session.createMessage();
String myUniqueID = "This is my unique id"; // Could use a UUID for this
message.setStringProperty(HDR_DUPLICATE_DETECTION_ID.toString(), myUniqueID);

Somebody knows if there's a similar solution for ActiveMQ?

gstackoverflow
  • 36,709
  • 117
  • 359
  • 710
Andre Pastore
  • 2,841
  • 4
  • 33
  • 44

6 Answers6

7

You should look at Apache Camel, it provides an Idempotent consumer component that would work with any JMS provider, see: http://camel.apache.org/idempotent-consumer.html

Using that in combination with the ActiveMQ component makes using JMS quite simple, see: http://camel.apache.org/activemq.html

Tim Bish
  • 17,475
  • 4
  • 32
  • 42
  • 2
    I'm in doubt if this method will solve my problem. I need to keep just one instance of message with the same JMSMessageID just during this instance is in queue. I need it working as a Set. I want be able to put other message with the same JMSMessageID after the latest idem element was removed from queue. I need to implement it and test. But, based on Idempotent described on EAI book, I think that the concept doesn't matches with my necessity. BUt, the proposed solution is good. I will study more about it and comment here my results. Thanks – Andre Pastore Feb 11 '11 at 01:31
5

I doubt if ActiveMQ supports it natively, but it should be easy to implement an idempotent consumer. A way to do this would be to add a unique identifier to each message at the producer end, now at the consumer end using a store(db, cache etc), a check can be made to see if the message has been received before and continue to process based on that check.

I see a previous stackoverflow question along the same lines - Apache ActiveMQ 5.3 - How to configure a queue to reject duplicate messages? , that may also help.

Community
  • 1
  • 1
Biju Kunjummen
  • 49,138
  • 14
  • 112
  • 125
  • 1
    Since the consumer itself can be multi threaded, to identify whether its duplicate or not, one has to implement distributed/in-memory locking. Right? – user1401472 Nov 17 '19 at 16:12
4

There is now support for removing duplicate messages baked into ActiveMQ transports. See the configuration values auditDepth and auditMaximumProducerNumber in the Connection Configuration Guide.

Chris Pitman
  • 12,990
  • 3
  • 41
  • 56
  • 4
    how do you actually configure those parameters to avoid duplicates? – Thomas Aug 26 '13 at 09:44
  • @Thomas I'm not sure what you're asking. How, in general, to apply configuration in ActiveMQ? Or what values to use for these specific fields? – Chris Pitman Aug 27 '13 at 02:36
  • It is just that from the description of the parameters it does not sound so clear to me. The `auditDepth` for example, does the value there means the Nb of messages or the nb of bytes that will be screened back for duplications? Regarding the `auditMaximumProducerNumber`, does that mean that there is a limited amount of producers that will be screened? Btw, if a message with the same content is published by 2 different subscribers, is the message by chance considered duplicated? – Thomas Aug 27 '13 at 06:15
  • 1
    @Chris IIUC these parameters guarantee detection of duplicates for 2048 messages each on up to 64 producers. But how does ActiveMQ determine what is a duplicate? If it's by JMSMessageID then we're back to square one because we can't set that. – Antares42 Jan 22 '15 at 13:53
3

There is a way to make ActiveMQ to filter duplicates based on a JMS property. it involves writing an Activemq Plugin. A basic broker filter that sends duplicate messages to the deadletter queue would be like this

import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.activemq.broker.Broker;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.broker.ProducerBrokerExchange;

public class DuplicateFilterBroker extends BrokerFilter {
    String messagePropertyName;
    boolean switchValue;

    public DuplicateFilterBroker(Broker next, String messagePropertyName) {
        super(next);
        this.messagePropertyName = messagePropertyName;
    }

    public boolean hasDuplicate(String propertyValue){
        switchValue = propertyValue;
        return switchValue;
    }

    public void send(ProducerBrokerExchange producerExchange, Message msg) throws Exception { 
        ActiveMQMessage amqmsg = (ActiveMQMessage)msg; 
        Object msgObj = msg.getMessage(); 
        if (msgObj instanceof javax.jms.Message) { 
            javax.jms.Message jmsMsg = (javax.jms.Message) msgObj; 
            if (!hasDuplicate(jmsMsg.getStringProperty(messagePropertyName))) {
                super.send(producerExchange, msg);
            }
            else {
               sendToDeadLetterQueue(producerExchange.getConnectionContext(), msg);
            } 
        }
    }  
}
Nick Spacek
  • 4,717
  • 4
  • 39
  • 42
Monachus
  • 194
  • 2
  • 9
  • How this plugin decides which property will be used for further filtering of duplicate messages, explanation about use-case would be really helpful to integrate such plugin. Thanks in advance for your answer. – Hayra Feb 26 '21 at 22:56
0

Seem the way that is suggested in the question, works for ActiveMQ too (2016/12). See the activemq-artemis guide. This requires the producer to set a specific property into the message.

Message jmsMessage = session.createMessage();
String myUniqueID = "This is my unique id";   // Could use a UUID for this
message.setStringProperty(HDR_DUPLICATE_DETECTION_ID.toString(), myUniqueID);

However the class containing the property is different: org.apache.activemq.artemis.core.message.impl.HDR_DUPLICATE_DETECTION_ID and the property value is _AMQ_DUPL_ID.

Gab
  • 756
  • 11
  • 23
0

Have you tried setting jms.checkForDuplicates=true?

WesternGun
  • 11,303
  • 6
  • 88
  • 157