4

I'm working on a Python app with a background thread for consuming message from a RabbitMQ Queue (topic scenario).

I start the thread on on_click event of a Button. Here is my code, please take attention on "#self.receive_command()".

def on_click_start_call(self,widget):


    t_msg = threading.Thread(target=self.receive_command)
    t_msg.start()
    t_msg.join(0)
    #self.receive_command()


def receive_command(self):

    syslog.syslog("ENTERED")

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    syslog.syslog("1")

    channel = connection.channel()
    syslog.syslog("2")

    channel.exchange_declare(exchange='STORE_CMD', type='topic')
    syslog.syslog("3")

    result = channel.queue_declare(exclusive=True)
    syslog.syslog("4")

    queue_name = result.method.queue
    syslog.syslog("5")

    def callback_rabbit(ch,method,properties,body):
        syslog.syslog("RICEVUTO MSG: RKEY:"+method.routing_key+" MSG: "+body+"\n")

    syslog.syslog("6")

    channel.queue_bind(exchange='STORE_CMD', queue=queue_name , routing_key='test.routing.key')
    syslog.syslog("7")

    channel.basic_consume(callback_rabbit,queue=queue_name,no_ack=True)
    syslog.syslog("8")

    channel.start_consuming()

If i run this code, i can't see on syslog the message 1,2,3,5,6,7,8 But i can see only "ENTERED". So, the code is locked on pika.BlokingConnection.

If i run the same code(commenting the thread instruction and decommenting the direct call to function), all works as espected and message are correctly received.

There are any solutions to run a consumer into a thread?

Thanks in Advance

Davide

user2056899
  • 41
  • 1
  • 1
  • 2

3 Answers3

8

I have tested the code on my machine, with the latest version of Pika. It works fine. There are threading issues with Pika, but as long as you create one connection per thread it shouldn't be a problem.

If you are experiencing issues, it is most likely because of a bug in an older version of Pika, or an unrelated issues with your threading causing an issue.

I would recommend that you avoid 0.9.13 as there are multiple bugs, but 0.9.14 0.10.0 should be released very soon™.

[Edit] Pika 0.9.14 has been released.

This is the code I used.

def receive_command():
    print("ENTERED")
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    print("1")
    channel = connection.channel()
    print("2")
    channel.exchange_declare(exchange='STORE_CMD', type='topic')
    print("3")
    result = channel.queue_declare(exclusive=True)
    print("4")
    queue_name = result.method.queue
    print("5")
    def callback_rabbit(ch,method,properties,body):
        print("RICEVUTO MSG: RKEY:"+method.routing_key+" MSG: "+body+"\n")
    print("6")
    channel.queue_bind(exchange='STORE_CMD', queue=queue_name , routing_key='test.routing.key')
    print("7")
    channel.basic_consume(callback_rabbit,queue=queue_name,no_ack=True)
    print("8")
    channel.start_consuming()

def start():
    t_msg = threading.Thread(target=receive_command)
    t_msg.start()
    t_msg.join(0)
    #self.receive_command()
start()
eandersson
  • 25,781
  • 8
  • 89
  • 110
  • thanks for sharing. If we added a second thread (separate connection, separate queue etc so def. no concurrency issues) - how would you gracefully handle the application shutdown? For example, is there any harm in just issuing a control-C (keyboard interrupt) - or does that result in both threads receiving the same shutdown sequence. Any `finally` logic etc to close the rabbit connections needed by the parent process (main)? – arcseldon Jan 29 '20 at 13:31
  • I looked on the rabbit mailing list (https://groups.google.com/forum/#!forum/rabbitmq-users) and not yet found an answer. your code above looks very close to what i need. Just looking for confirmation on the shutdown part. – arcseldon Jan 29 '20 at 13:33
  • And final question - do you have any thoughts on this solution (using processes instead of threads) - https://stackoverflow.com/a/45142386/1882064 Any shutdown specifics for multiprocesses? – arcseldon Jan 29 '20 at 13:45
5

Another approach is pass to thread method channel.start_consuming as target then just pass your callback to consume method. Usage: consume(callback=your_method, queue=your_queue)

import threading

def consume(self, *args, **kwargs):
    if "channel" not in kwargs \
            or "callback" not in kwargs \
            or "queue" not in kwargs \
            or not callable(kwargs["callback"]):
        return None

    channel = kwargs["channel"]
    callback = kwargs["callback"]
    queue = kwargs["queue"]
    channel.basic_consume(callback, queue=queue, no_ack=True)

    t1 = threading.Thread(target=channel.start_consuming)
    t1.start()
    t1.join(0)
dorintufar
  • 660
  • 9
  • 22
1

The method suggested by dorintufar was useful for me but I encoutered a TypeError due to my version of the pika wrapper. If you get such an error it was advised to the parameters order in channel.basic_consume from:

channel.basic_consume(callback, queue=queue, no_ack=True)

to:

channel.basic_consume(queue, callback, no_ack=True)
SNygard
  • 916
  • 1
  • 9
  • 21
Rony Armon
  • 178
  • 1
  • 3
  • 8