0

I have an activemq configuration wherein I have a virtual destination and a normal topic

I want to route all the JMS messages to the destination(VirtualTopic.Notifications) to 2 queues(VirtualTopic.SMS, VirtualTopic.EMAIL) based on their JMSType in the message header.

And I want the normal Topic(VirtualTopic.gps) to work as usual.

This is my configuration of activemq.xml. Here Consumer.SMS.VirtualTopic and Consumer.EMAIL.VirtualTopic is created.

    <destinations>
        <queue physicalName="Consumer.SMS.VirtualTopic" />
        <queue physicalName="Consumer.EMAIL.VirtualTopic" />
    </destinations>

    <destinationInterceptors>
      <virtualDestinationInterceptor>
        <virtualDestinations>
          <compositeQueue name="VirtualTopic.Notifications" forwardOnly="false">
            <forwardTo>
              <filteredDestination selector="JMSType = 'SMS'" queue="Consumer.SMS.VirtualTopic"/>
              <filteredDestination selector="JMSType = 'EMAIL'" queue="Consumer.EMAIL.VirtualTopic"/>
            </forwardTo>
          </compositeQueue>
        </virtualDestinations>
      </virtualDestinationInterceptor>
    </destinationInterceptors>

While the consumer and topic (VirtualTopic.gps) is created from the server side code.

    private static MessageProducer getTopicProducer(String topicName) throws JMSException {
    MessageProducer producer = topicProducers.get(topicName);

    if (producer == null) {
        logger.info("Creating message producer for Topic : {}", topicName);
        Destination destination = session.createTopic(topicName);

        List<String> queueNames = PropertyReader
                .getPropertyStringList("jms.topic.consumer.list", JMSProducer.properties);
        if (queueNames != null) {
            for (String queueName : queueNames) { 
                Queue virtualQueue = session.createQueue(queueName);
                MessageConsumer con = session.createConsumer(virtualQueue);
                con.close();
            }
        }

        producer = session.createProducer(destination);
        topicProducers.put(topicName, producer);
    }

    return producer;
    }

All the messages to the VirtualTopic.Notifications are routed to 2 different queues and consumers can pick up messages from respective queues

But the issue is all the messages which are being sent to the VirtualTopic.gps are filtered and the consumers cant consume the gps messages.

Sabya
  • 197
  • 4
  • 17
  • firstly; you are talking about topics but i do not seen any topic in your config, all your destinations are queue's, to use topics you have to update your config with and use – Hassen Bennour Jan 18 '17 at 10:19
  • about which consumer you talk for wich destinations ?? **But consumers for other topics are not picking up the messages.** – Hassen Bennour Jan 18 '17 at 10:22
  • revert to which config ?? **When, I revert my configuration, the consumers for topics other than SMS, EMAIL is working.** – Hassen Bennour Jan 18 '17 at 10:36
  • Hassen Bennour: Hey, I meant all the messages are going through the and getting filtered. But the desired behaviour should be like on the messages which are posted to VirtualTopic.Notifications should be filtered to SMS and EMAILS. I have one more topic VirtualTopic.gps for which the consumers are not picking up messages – Sabya Jan 18 '17 at 10:49
  • how you configured this VirtualTopic.gps ? you send message to it as a destination ? – Hassen Bennour Jan 18 '17 at 11:09
  • can you post the config of the problem you talking about – Hassen Bennour Jan 18 '17 at 11:15
  • @Hassen: I am Configuring the topic from server side code. And am not sending it to the destination. I'm sending it like a normal topic. – Sabya Jan 18 '17 at 12:03
  • The other topic(VirtualTopic.gps) is created from code which follows: Destination destination = session.createTopic(topicName); Queue virtualQueue = session.createQueue(queueName); Consumers are not able to consume from this topic(VirtualTopic.gps) once i have the tags enabled for the VirtualTopic.Notification destination. – Sabya Jan 18 '17 at 12:11
  • the is only enabled for VirtualTopic.Notification destination, can you update your question with the code of producers and consumers and activemq.xml , please use the code formatter not by adding pictures to be easiest to read – Hassen Bennour Jan 18 '17 at 12:17
  • @Hassen Bennour done. can you please help me. Am stuck with the issue. – Sabya Jan 18 '17 at 14:39
  • what is the utility of this code ?? : `if (queueNames != null) { for (String queueName : queueNames) { Queue virtualQueue = session.createQueue(queueName); MessageConsumer con = session.createConsumer(virtualQueue); con.close(); } }` – Hassen Bennour Jan 18 '17 at 14:56
  • try my code snippet of updated answer to send and receive from this topic, there is no problem, **as i said the messages sent to VirtualTopic.gps are not filtered because there is no filter for this topic** – Hassen Bennour Jan 18 '17 at 14:59
  • i don't see where you create a consumer for VirtualTopic.gps ?? – Hassen Bennour Jan 18 '17 at 15:00
  • @HassenBennour: that code is responsible to create a number of queues and consumers based on the names configured in a property file. That code doesnt have anything to do with the issue.. – Sabya Jan 18 '17 at 15:02
  • try my code snippet of updated answer and config `` – Hassen Bennour Jan 18 '17 at 15:38
  • thanks a lot. you saved my day :) – Sabya Jan 18 '17 at 16:17

2 Answers2

2

Thank you so much Hassen..

Adding this line <virtualTopic name=">" selectorAware="false" /> to the activemq.xml did the trick.

    <destinationInterceptors>
        <virtualDestinationInterceptor>
            <virtualDestinations>
                <compositeQueue name="VirtualTopic.Notifications"
                    forwardOnly="false">
                    <forwardTo>
                        <filteredDestination selector="JMSType = 'SMS'"
                            queue="Consumer.SMS.VirtualTopic" />
                        <filteredDestination selector="JMSType ='EMAIL'"
                            queue="Consumer.EMAIL.VirtualTopic" />
                    </forwardTo>
                </compositeQueue>
                <virtualTopic name=">" selectorAware="false" />
            </virtualDestinations>
        </virtualDestinationInterceptor>
    </destinationInterceptors>
Sabya
  • 197
  • 4
  • 17
1

The following example shows how to set up a element in the XML configuration so that when a message is sent to MY.QUEUE then it is really forwarded to the physical queue FOO and the topic BAR.

<destinationInterceptors>
  <virtualDestinationInterceptor>
    <virtualDestinations>
      <compositeQueue name="MY.QUEUE">
        <forwardTo>
          <queue physicalName="FOO" />
          <topic physicalName="BAR" />
        </forwardTo>
      </compositeQueue>
    </virtualDestinations>
  </virtualDestinationInterceptor>
</destinationInterceptors>

By default, subscribers cannot consume messages directly from a composite queue or topic - it is a logical construct only. Given the configuration above, subscribers can only consume messages from FOO and BAR; but not MY.QUEUE. This behaviour can be altered to implement use cases such as watching a queue by sending the same messages to a notification topic (wire tapping), by setting the optionally set forwardOnly attribute to false.

<compositeQueue name="IncomingOrders" forwardOnly="false">
    <forwardTo>
        <topic physicalName="Notifications" />
    </forwardTo>
</compositeQueue>

Messages sent to IncomingOrders will all be copied and forwarded to Notifications, before being placed on the physical IncomingOrders queue for consumption by subscribers.

take a look here http://activemq.apache.org/virtual-destinations.html

with your actual config you can consume only from queue's SMS & EMAIL, if you want to consume from Notifications you need to set forwardOnly="false"

UPDATE : Try this code :

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQTextMessage;

public class SimpleSenderConsumerVirtualTopic {

    public static void main(String[] args) throws JMSException {
        Connection conn = null;
        try {
            ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
            conn = cf.createConnection( );
            ActiveMQSession session = (ActiveMQSession) conn.createSession(false,
                    ActiveMQSession.AUTO_ACKNOWLEDGE);
            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session
                    .createConsumer(session.createQueue("Consumer.A.VirtualTopic.gps"));
            MessageProducer producer = session.createProducer(session.createTopic("VirtualTopic.gps"));
            conn.start();
            ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage("VirtualTopic.gps test");
            producer.send(msg);
            msg = null;
            while ((msg = (ActiveMQTextMessage) consumer.receive(5000)) != null) {
                System.out.println("Received message is: " + msg.getText());
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (Exception e) {
                }
            }
        }
    }
}

AND add this :

<destinationInterceptors>
      <virtualDestinationInterceptor>
        <virtualDestinations>
          <compositeQueue name="VirtualTopic.Notifications" forwardOnly="false">
            <forwardTo>
              <filteredDestination selector="JMSType = 'SMS'" queue="Consumer.SMS.VirtualTopic"/>
              <filteredDestination selector="JMSType = 'EMAIL'" queue="Consumer.EMAIL.VirtualTopic"/>
            </forwardTo>
          </compositeQueue>
          <virtualTopic name=">"  selectorAware="false" />
        </virtualDestinations>
      </virtualDestinationInterceptor>
    </destinationInterceptors>
Hassen Bennour
  • 3,885
  • 2
  • 12
  • 20
  • No Hassen, I don't want to consume from VirtualTopics.Notifications. I have one more Topic(VirtualTopic.gps), for which consumers are not picking up messages. – Sabya Jan 18 '17 at 10:51
  • 1
    Adding this to the destination interceptors configuration solved the problem. – Sabya Jan 31 '18 at 11:02
  • For this reason i found strange that you use given responses to post yours and accept it in place – Hassen Bennour Jan 31 '18 at 12:43