0

I have created a simple Console Application listening to messages on Rabbit MQ. Works fine. No problems there. But how do I close the connection. I've been googleing arround a lot, but I didn't find any clear answers. The closest I got to an answer is this SO question: What is the best way to safely end a java application with running RabbitMQ consumers, but the answer omits the most important part: The code!

Here is my code:

package com.company;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;

interface ObjectRecievedListener {
    void objectRecieved(Object obj);
}

public class RabbitMQReceiver extends RabbitMQBase
{

    ArrayList<DefaultConsumer> consumers = new ArrayList<>();

    private List<ObjectRecievedListener> listeners = new ArrayList<>();
    private final Connection connection;
    private final Channel channel;

    public RabbitMQReceiver(RabbitMQProperties properties) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(properties.getHost());
        factory.setPassword(properties.getPassword());
        factory.setUsername(properties.getLogin());

        connection = factory.newConnection();
        channel = connection.createChannel();
        channel.queueDeclare(properties.getInboundQueueName(), true, false, false, null);
        channel.basicQos(1);

        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");


                try {
                    doWork(message);
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        consumers.add((DefaultConsumer) consumer);

        boolean autoAck = false;
        channel.basicConsume(properties.getInboundQueueName(), autoAck, consumer);
    }

    public void addListener(ObjectRecievedListener listener) {
        listeners.add(listener);
    }

    public void stop() {
        try {
            channel.close();
            connection.close();
        } catch (Exception e) {
        }
    }

    private void doWork(String message) {
        Object obj = getObjectFromXML(message);

        for (ObjectRecievedListener l : listeners)
            l.objectRecieved(obj);
        stop();
    }
}

But it doesn't stop just because I called stop()

So in short: How do I close the connection to Rabbit MQ?

Jens Borrisholt
  • 6,174
  • 1
  • 33
  • 67

2 Answers2

1

The RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.


When you say "it doesn't stop" have you confirmed that the stop() method is actually called? Is your code hanging on one of the close() methods?

The channel instance will have a basicCancel method that you could call to cancel the current consumer before closing the channel and connection. To be honest, closing the channel will cancel your consumer so I doubt this is the root cause of the issue.

Luke Bakken
  • 8,993
  • 2
  • 20
  • 33
-2

Try this:

let connection = null;

    connection = await amqp.connect(`amqp://${config.username}:${config.password}@${config.host}:${config.port}`);

    connection.close();
Saad Ahmed
  • 700
  • 1
  • 8
  • 15