24

I'm researching queuing solutions for one of my team's apps. Ideally we would like something that can be configured both as a lightweight, in-process broker (for low-throughput messaging between threads) and as an external broker. Is there an MQ server out there that can do this? Most seem to require setup as an external entity. ZeroMQ appears to come the closest to an in-process solution, but it seems to be more of a "UDP socket on steroids", and we need reliable delivery.

Jean-Philippe Bond
  • 10,089
  • 3
  • 34
  • 60
Evan Haas
  • 2,524
  • 2
  • 22
  • 34
  • 1
    I think that the answers to http://stackoverflow.com/questions/2507536/lightweight-jms-broker contain interesting info (eg the [ffmq](http://ffmq.sourceforge.net/index.html) suggestion). ActiveMQ is another albeit heavier candidate, but it's also embeddable. – fvu Jan 15 '13 at 16:46
  • 1
    Like @fvu said, ActiveMQ is a bit heavier than ZeroMQ but it work really well as an embedded process. And if you are using Spring, it is really easy to set up. – Jean-Philippe Bond Jan 15 '13 at 16:54
  • ZeroMQ runs among other things on top of TCP (not UDP) which provides a reliable transport. However, are you referring to a persistent queue? I.e. backed to disc? – Jakob Möllås Jan 15 '13 at 16:56
  • We don't care about persisting the messages per-se, but we do want delivery to be as reliable as possible. Performance is not an issue (yet). I'm assuming that means the queue should be persistent. – Evan Haas Jan 15 '13 at 21:07
  • @fvu Thanks for the input, I am going with ActiveMQ. Could you repost as an answer so I can accept? – Evan Haas Jan 16 '13 at 16:46
  • @Jean-PhilippeBond Thanks for the input, I am going with ActiveMQ. If you repost as a answer, I will upvote. – Evan Haas Jan 16 '13 at 16:46
  • @Evan Ok np I can post an example of Spring with ActiveMQ as an embedded process if you want I have something already done. – Jean-Philippe Bond Jan 16 '13 at 16:49
  • @Jean-PhilippeBond Thanks, that would be great as we will be configuring ActiveMQ with Spring. – Evan Haas Jan 16 '13 at 16:52
  • @EvanHaas Thanks for the confidence, but may I suggest you accept Jean-Philippe's answer instead? I just wrote a quick oneliner, his extensive answer contains a lot of useful stuff. – fvu Jan 17 '13 at 00:21

2 Answers2

12

Like we said ActiveMQ is a bit heavier than ZeroMQ but it work really well as an embedded process. Here a simple example with Spring and ActiveMQ.

The message listener that will be used to test the queue :

public class TestMessageListener implements MessageListener {

    private static final Logger logger = LoggerFactory.getLogger(TestMessageListener.class);

    @Override
    public void onMessage(Message message) {

        /* Receive the text message */
        if (message instanceof TextMessage) {

            try {
                String text = ((TextMessage) message).getText();
                System.out.println("Message reception from the JMS queue : " + text);

            } catch (JMSException e) {
                logger.error("Error : " + e.getMessage());
            }

        } else {
            /* Handle non text message */
        }
    }
}

ActiveMQ context configuration :

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean id="jmsQueueConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
        <property name="brokerURL">
            <value>tcp://localhost:61617</value>
        </property>
    </bean>

    <bean id="pooledJmsQueueConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
        <constructor-arg ref="jmsQueueConnectionFactory" />
    </bean>

    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="messageQueue" />
    </bean>

    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
       <constructor-arg ref="pooledJmsQueueConnectionFactory" />
       <property name="pubSubDomain" value="false"/>
    </bean>

    <bean id="testMessageListener" class="com.example.jms.TestMessageListener" />

    <bean id="messageQueuelistenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="pooledJmsQueueConnectionFactory" />
        <property name="destination" ref="QueueDestination" />
        <property name="messageListener" ref="testMessageListener" />
        <property name="concurrentConsumers" value="5" />
        <property name="acceptMessagesWhileStopping" value="false" />
        <property name="recoveryInterval" value="10000" />
        <property name="cacheLevelName" value="CACHE_CONSUMER" /> 
    </bean>

</beans>

The JUnit test :

@ContextConfiguration(locations = {"classpath:/activeMQ-context.xml"})
public class SpringActiveMQTest extends AbstractJUnit4SpringContextTests {

    @Autowired
    private JmsTemplate template;

    @Autowired
    private ActiveMQDestination destination;

    @Test
    public void testJMSFactory() {
        /* sending a message */
        template.convertAndSend(destination, "Hi");

        /* receiving a message */
        Object msg = template.receive(destination);
        if (msg instanceof TextMessage) {
            try {
                System.out.println(((TextMessage) msg).getText());
            } catch (JMSException e) {
                System.out.println("Error : " + e.getMessage());
            }
        }
    }
}

The Dependencies to add to the pom.xml :

<!-- Spring -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>${org.springframework-version}</version>
</dependency>

<!-- ActiveMQ -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.6.0</version>
    <scope>compile</scope>
</dependency>

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
    <version>5.6.0</version>
</dependency>

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.6.0</version>
</dependency>
Jean-Philippe Bond
  • 10,089
  • 3
  • 34
  • 60
  • Issues we have seen with ActiveMQ (and why I'm currently doing a quick search for alternatives): (a) when you shut it down, it doesn't stop all its threads. (b) many of its APIs auto-start services on construction while also having start()-like methods, making the API unclear and possibly contributing to (a). (c) a multitude of issues with multi-threading, which to date have been "fixed" by them adding more synchronized blocks. (d) dubious security, particularly when used in discovery mode. (Pro-tip: set up a rogue message queue server for fun times around the office!) – Hakanai May 08 '15 at 01:29
  • Do you know if the shutdown of thread problem have been resolved ? Do you still use this solution ? – Mike Feb 15 '19 at 17:32
1

The WebSphere MQ client has the capability to perform multicast pub/sub. This provides a client-to-client capability that bypasses the queue manager, although a queue manager is required to establish the connection.

T.Rob
  • 31,522
  • 9
  • 59
  • 103