10

I want to consume a queue (RabbitMQ) synchronously with blocking.

Note: below is full code ready to be run.

The system set up is using RabbitMQ as it's queuing system, but asynchronous consumption is not needed in one of our modules.

I've tried using basic_get on top of a BlockingConnection, which doesn't block (returns (None, None, None) immediately):

# declare queue
get_connection().channel().queue_declare(TEST_QUEUE)
def blocking_get_1():

        channel = get_connection().channel()

        # get from an empty queue (prints immediately)
        print channel.basic_get(TEST_QUEUE)

I've also tried to use the consume generator, fails with "Connection Closed" after a long time of not consuming.

def blocking_get_2():
        channel = get_connection().channel()
        # put messages in TEST_QUEUE
        for i in range(4):
                channel.basic_publish(
                        '',
                        TEST_QUEUE,
                        'body %d' % i
                )
        consume_generator = channel.consume(TEST_QUEUE)
        print next(consume_generator)
        time.sleep(14400)
        print next(consume_generator)

Is there a way to use RabbitMQ using the pika client as I would a Queue.Queue in python? or anything similar?

My option at the moment is busy-wait (using basic_get) - but I rather use the existing system to not busy-wait, if possible.

Full code:

#!/usr/bin/env python
import pika
import time

TEST_QUEUE = 'test'
def get_connection():
        # define connection
        connection = pika.BlockingConnection(
                pika.ConnectionParameters(
                        host=YOUR_IP,
                        port=YOUR_PORT,
                        credentials=pika.PlainCredentials(
                                username=YOUR_USER,
                                password=YOUR_PASSWORD,
                        )
                )
        )
        return connection

# declare queue
get_connection().channel().queue_declare(TEST_QUEUE)
def blocking_get_1():

        channel = get_connection().channel()

        # get from an empty queue (prints immediately)
        print channel.basic_get(TEST_QUEUE)

def blocking_get_2():
        channel = get_connection().channel()
        # put messages in TEST_QUEUE
        for i in range(4):
                channel.basic_publish(
                        '',
                        TEST_QUEUE,
                        'body %d' % i
                )
        consume_generator = channel.consume(TEST_QUEUE)
        print next(consume_generator)
        time.sleep(14400)
        print next(consume_generator)


print "blocking_get_1"
blocking_get_1()

print "blocking_get_2"
blocking_get_2()

get_connection().channel().queue_delete(TEST_QUEUE)
Reut Sharabani
  • 30,449
  • 6
  • 70
  • 88
  • I think it also has to do with not sending the heartbeat (`consume` possibly blocks them?) as seen here: http://stackoverflow.com/questions/14572020/handling-long-running-tasks-in-pika-rabbitmq – Reut Sharabani Oct 21 '14 at 06:48
  • 1
    I posted my take on this, but let me know if I misunderstood your question. :) – eandersson Oct 22 '14 at 10:26

1 Answers1

12

A common problem with Pika is that it is currently not handling incoming events in the background. This basically means that in many scenarios you will need to call connection.process_data_events() periodically to ensure that it does not miss heartbeats.

This also means that if you sleep for a extended period of time, pika will not be handling incoming data, and eventually die as it is not responding to heartbeats. An option here is to disable heartbeats.

I usually solve this by having a thread in the background check for new events, as seen in this example.

If you want to block completely I would do something like this (based on my own library AMQPStorm).

while True:
    result = channel.basic.get(queue='simple_queue', no_ack=False)
    if result:
        print("Message:", message.body)
        message.ack()
    else:
        print("Channel Empty.")
        sleep(1)

This is based on the example found here.

eandersson
  • 25,781
  • 8
  • 89
  • 110
  • I remember having trouble when accessing the connection from two threads. Inter-thread communication adds overhead so I'm going to wait for a way to do it without it. I'll give it another go later on and update here. – Reut Sharabani Oct 26 '14 at 08:03
  • 2
    Yea if you are using pika it can be difficult. It is not designed for threading, but the example I linked can handle quite a lot of simultaneous messages. My library amqp-storm on the other hand should make it easier, as it is thread safe. – eandersson Oct 26 '14 at 14:41
  • the links provided are outdated... – Ouss Dec 14 '21 at 17:27