3

I would like to get a copy of the messages present in a Rabbitmq queue without consuming them. Is it possible ? Thanks in advance,

k.Sugetha
  • 33
  • 1
  • 5
  • Possible duplicate of [Using RabbitMQ is there a way to look at the queue contents without a dequeue?](https://stackoverflow.com/questions/4700292/using-rabbitmq-is-there-a-way-to-look-at-the-queue-contents-without-a-dequeue) – Ulugbek Umirov Feb 25 '19 at 13:23

3 Answers3

5

I would like to get a copy of the messages present in a Rabbitmq queue without consuming them. Is it possible?

No. The closest option you have is to consume or get a message and then reject it via a negative acknowledgement.


NOTE: the RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.

Luke Bakken
  • 8,993
  • 2
  • 20
  • 33
2

may be you can register a consumer (as shown here in the official doc) without giving ack to the broker: no_ack=True

channel.basic_consume(callback, queue='hello', no_ack=True)

This way your consumer receives the message content but the message itself is not marked as delivered by the broker (and returns to Ready state when you consumer exits).

May be this is not the cleanest way to do what you need, but it works and it's simple.

Another (but similary) approach you can adopt is based on the so-called pull API (in opposite to push API you use when you register a subscriber); I used this approach in a .Net application: you can find the .Net documentation here and I think Python APIs are similar in that, too.

The key idea is to get a message without giving ack: channel.BasicGet(queueName, noAck)

I hope this can help you to move towards a full and reliable solution!

Pietro Martinelli
  • 1,776
  • 14
  • 16
  • There is a problem with this approach because it puts the message in the "Unacked" state until the consumer dies. – Luke Bakken Feb 25 '19 at 15:02
  • 'no_ack' parameter now is 'auto_ack', having the inverse logic, and set by False by default in basic_consume() – xCovelus May 05 '21 at 09:33
2

There is a better way i found to get all the messages on a queue by using the channel.basic_get() function, as demonstrated in the following code:

    def __init__(self):
        self.host = ConfigTools().get_attr('host')
        self.waiting_queue = ConfigTools().get_attr('test_queue_name')

    def view_queue(self) -> list:
        """Reads everything from the queue, then disconnects, causing the server to requeue the messages
        Returning the delivery tag is pointless at this point because the server makes the tag (an integer) up on
        the fly and the order can shuffle constantly"""
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=self.host))

        msgs = []
        while True:
            chl = connection.channel()
            method_frame, header_frame, body = chl.basic_get(queue='test')
            if method_frame:
                print("body : ", body)
                msgs.append(body)
            else:
                print("No more messages returned")
                connection.close()
                break
        return msgs

Then, if at any time I know which message on the queue that I want to pop off, I can use something similar:

    def remove(self, item) -> list:
        """Removes the item from the queue. Goes through the entire queue, similar to view_queue, and acknowledges
        the msg in the list that matches, and returns the msg.
        If item matches more than one message on the queue, only one is deleted
        """

        if isinstance(item, list):
            if not (isinstance(i, bytes) for i in item):
                print("Item must be a list of only byte objects")
        if not isinstance(item, bytes):
            print("Item must be a singe bytes object")
            raise TypeError

        connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=self.host))

        msgs = []
        while True:
            chl = connection.channel()
            method_frame, header_frame, body = chl.basic_get(queue='test')
            if method_frame:
                print('body: ', body)
                if body == item:
                    print("item found!")
                    msgs.append(body)
                    chl.basic_ack(method_frame.delivery_tag)
                    connection.close()
                    return msgs

            else:
                print("Message not found")
                connection.close()
                break
        return msgs

Note: I'm using this for a small application—less than fifty messages on a queue ever. I cannot say how this function will hold up in larger applications.

Jeremy Caney
  • 7,102
  • 69
  • 48
  • 77
Lacrosse343
  • 491
  • 1
  • 3
  • 18