26

In my (limited) experience with rabbit-mq, if you create a new listener for a queue that doesn't exist yet, the queue is automatically created. I'm trying to use the Spring AMQP project with rabbit-mq to set up a listener, and I'm getting an error instead. This is my xml config:

<rabbit:connection-factory id="rabbitConnectionFactory" host="172.16.45.1" username="test" password="password" />

<rabbit:listener-container connection-factory="rabbitConnectionFactory"  >
    <rabbit:listener ref="testQueueListener" queue-names="test" />
</rabbit:listener-container>

<bean id="testQueueListener" class="com.levelsbeyond.rabbit.TestQueueListener"> 
</bean>

I get this in my RabbitMq logs:

=ERROR REPORT==== 3-May-2013::23:17:24 ===
connection <0.1652.0>, channel 1 - soft error:
{amqp_error,not_found,"no queue 'test' in vhost '/'",'queue.declare'}

And a similar error from AMQP:

2013-05-03 23:17:24,059 ERROR [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer] (SimpleAsyncTaskExecutor-1) - Consumer received fatal exception on startup
org.springframework.amqp.rabbit.listener.FatalListenerStartupException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.

It would seem from the stack trace that the queue is getting created in a "passive" mode- Can anyone point out how I would create the queue not using the passive mode so I don't see this error? Or am I missing something else?

eric
  • 2,699
  • 4
  • 29
  • 40

5 Answers5

27

Older thread, but this still shows up pretty high on Google, so here's some newer information:

2015-11-23

Since Spring 4.2.x with Spring-Messaging and Spring-Amqp 1.4.5.RELEASE and Spring-Rabbit 1.4.5.RELEASE, declaring exchanges, queues and bindings has become very simple through an @Configuration class some annotations:

@EnableRabbit
@Configuration
@PropertySources({
    @PropertySource("classpath:rabbitMq.properties")
})
public class RabbitMqConfig {    
    private static final Logger logger = LoggerFactory.getLogger(RabbitMqConfig.class);

    @Value("${rabbitmq.host}")
    private String host;

    @Value("${rabbitmq.port:5672}")
    private int port;

    @Value("${rabbitmq.username}")
    private String username;

    @Value("${rabbitmq.password}")
    private String password;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);

        logger.info("Creating connection factory with: " + username + "@" + host + ":" + port);

        return connectionFactory;
    }

    /**
     * Required for executing adminstration functions against an AMQP Broker
     */
    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    /**
     * This queue will be declared. This means it will be created if it does not exist. Once declared, you can do something
     * like the following:
     * 
     * @RabbitListener(queues = "#{@myDurableQueue}")
     * @Transactional
     * public void handleMyDurableQueueMessage(CustomDurableDto myMessage) {
     *    // Anything you want! This can also return a non-void which will queue it back in to the queue attached to @RabbitListener
     * }
     */
    @Bean
    public Queue myDurableQueue() {
        // This queue has the following properties:
        // name: my_durable
        // durable: true
        // exclusive: false
        // auto_delete: false
        return new Queue("my_durable", true, false, false);
    }

    /**
     * The following is a complete declaration of an exchange, a queue and a exchange-queue binding
     */
    @Bean
    public TopicExchange emailExchange() {
        return new TopicExchange("email", true, false);
    }

    @Bean
    public Queue inboundEmailQueue() {
        return new Queue("email_inbound", true, false, false);
    }

    @Bean
    public Binding inboundEmailExchangeBinding() {
        // Important part is the routing key -- this is just an example
        return BindingBuilder.bind(inboundEmailQueue()).to(emailExchange()).with("from.*");
    }
}

Some sources and documentation to help:

  1. Spring annotations
  2. Declaring/configuration RabbitMQ for queue/binding support
  3. Direct exchange binding (for when routing key doesn't matter)

Note: Looks like I missed a version -- starting with Spring AMQP 1.5, things get even easier as you can declare the full binding right at the listener!

Jaymes Bearden
  • 2,009
  • 2
  • 21
  • 24
11

What seemed to resolve my issue was adding an admin. Here is my xml:

<rabbit:listener-container connection-factory="rabbitConnectionFactory"  >
    <rabbit:listener ref="orderQueueListener" queues="test.order" />
</rabbit:listener-container>

<rabbit:queue name="test.order"></rabbit:queue>

<rabbit:admin id="amqpAdmin" connection-factory="rabbitConnectionFactory"/>

<bean id="orderQueueListener" class="com.levelsbeyond.rabbit.OrderQueueListener">   
</bean>
eric
  • 2,699
  • 4
  • 29
  • 40
9

As of Spring Boot 2.1.6 and Spring AMQP 2.1.7 you can create queues during startup if they don't exists with this:

@Component
public class QueueConfig {

    private AmqpAdmin amqpAdmin;

    public QueueConfig(AmqpAdmin amqpAdmin) {
        this.amqpAdmin = amqpAdmin;
    }

    @PostConstruct
    public void createQueues() {
        amqpAdmin.declareQueue(new Queue("queue_one", true));
        amqpAdmin.declareQueue(new Queue("queue_two", true));
    }
}
Rafael Renan Pacheco
  • 2,138
  • 21
  • 22
4

Can you add this after your connection tag, but before the listener:

<rabbit:queue name="test" auto-delete="true" durable="false" passive="false" />

Unfortunately, according to the XSD schema, the passive attribute (listed above) is not valid. However, in every queue_declare implementation I've seen, passive has been a valid queue_declare parameter. I'm curious to see whether that will work or whether they plan to support it in future.

Here is the full list of options for a queue declaration: http://www.rabbitmq.com/amqp-0-9-1-reference.html#class.queue

And here is the full XSD for the spring rabbit schema (with comments included): http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd

Homer6
  • 15,034
  • 11
  • 61
  • 81
  • Unfortunately, I get an error that the "passive" attribute isn't allowed in the queue element on startup. It is weird that the "declare" method takes the passive argument, but I can't seem to define it in xml. – eric May 04 '13 at 17:19
  • Sorry I'm being dense- I don't see what I'm missing. I see you're saying it's not valid xml according to the xsd and I can ignore the error eclipse gives me- but I actually get a runtime error as well. – eric May 05 '13 at 21:15
  • That's okay. My middle name is dense. :-) I think we're saying the same thing. IMHO, the above passive parameter should be present if it's a full rabbit interface. Spring RabbitMQ is missing it. I would file a bug report with them and link back to this question in your bug report. They may know something that we're both missing or it may just be a legitimate omission. – Homer6 May 06 '13 at 04:47
0

If previously you were using spring-rabbit version <1.6 and now you upgrade to that version or after and you find your queues arent getting created then most likely you could be missing a RabbitAdmin bean. Previous versions dont seem to need that in the context but 1.6 and after do

Nanotron
  • 554
  • 5
  • 16