1

Please explain how to configure a Java client to read from two different RabbitMQ exchanges without polling. I would like the client to awaken when a message arrives, then block again.

In my little system integration problem, one RabbitMQ exchange carries work messages using a variety of routing keys (I know how to use wildcards to catch them), and the other exchange carries control messages (e.g., "stop"). So my client has to listen to messages from both places. This is a relatively low-volume system problem, I'm not asking about load sharing or fairness etc.

Sure I could run a thread that polls each exchange, sleeps, dispatches, forever. But I would like to avoid polling.

I'm somehow reminded of the Unix select() system call that awakens when data is ready on any of the file descriptors handed to it. Does RabbitMQ have something similar?

My current solution is an adapter that spins up a thread to block on each input exchange; upon receipt each thread writes to a java.util.concurrent collection; and I use yet another thread to block on that collection and deliver messages as they arrive to the ultimate consumer. It works fine but if I can chop out this complexity, that would be great.

These SO posts dance around the issue, please feel free to rub my nose in the solution if I've overlooked it in these posts:

For java: RabbitMQ by Example: Multiple Threads, Channels and Queues

For C#: Reading from multiple queues, RabbitMQ

Thanks in advance.

Community
  • 1
  • 1
chrisinmtown
  • 3,571
  • 3
  • 34
  • 43
  • 1
    have you looked at the tutorials? http://www.rabbitmq.com/tutorials/tutorial-three-java.html you need a thread per consumer and use the consumer.nextDelivery() method to block while waiting for each message. – robthewolf Mar 13 '14 at 10:09

1 Answers1

1

Thanks, robthewolf, for the comment. Yeah, I've read the tutorials and I know I need a thread per consumer.

Turns out it's straightforward to read from multiple exchanges with a single thread, and no polling at all: get a new queue, and bind it to all the relevant exchanges. Works for topic and fanout. Tested this with a SSCE, see below.

I lament the lack of detail in the RabbitMQ javadoc, a few choice words in the Channel#queueBind(String, String, String) method would have helped a lot.

HTH

package rabbitExample;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * Demonstrates reading messages from two exchanges via a single queue monitored
 * by a single thread.
 * 
 */
public class MultiExchangeReadTest implements Runnable {

private final String exch1 = "my.topic.exchange";
private final String exch2 = "my.fanout.exchange";
private final Channel channel;
private final QueueingConsumer consumer;

public MultiExchangeReadTest(final String mqHost) throws Exception {

    // Connect to server
    System.out.println("Connecting to host " + mqHost);
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(mqHost);
    Connection connection = factory.newConnection();
    channel = connection.createChannel();

    // Declare exchanges; use defaults for durable etc.
    channel.exchangeDeclare(exch1, "topic");
    channel.exchangeDeclare(exch2, "fanout");

    // Get a new, unique queue name
    final String queue = channel.queueDeclare().getQueue();

    // Bind the queue to the exchanges; topic gets non-empty routing key
    channel.queueBind(queue, exch1, "my.key");
    channel.queueBind(queue, exch2, "");

    // Configure the channel to fetch one message at a time, auto-ACK
    channel.basicQos(1);
    consumer = new QueueingConsumer(channel);
    channel.basicConsume(queue, true, consumer);
}

public void run() {
    // Reads messages until interrupted
    try {
        while (true) {
            // Wait for a message
            System.out.println("Awaiting message");
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            // Show contents using default encoding scheme
            String body = new String(delivery.getBody());
            System.out.println("Message from exch "
                    + delivery.getEnvelope().getExchange() + ", key '"
                    + delivery.getEnvelope().getRoutingKey() + "':\n"
                    + body);
        } // while
    } catch (Exception ex) {
        ex.printStackTrace();
    }
}

public static void main(String[] args) throws Exception {
    if (args.length != 1) {
        System.err
                .println("Usaage: MultiExchangeReadTest.main mq-host-name");
    } else {
        MultiExchangeReadTest multiReader = new MultiExchangeReadTest(
                args[0]);
        multiReader.run();
    }
}
}
chrisinmtown
  • 3,571
  • 3
  • 34
  • 43