8

I'm using Celery with an AMQP broker to call tasks, but the response needs to be passed back with a different queue architecture than Celery uses, so I want to pass the messages back using Kombu only. I've been able to do this, but I'm creating a new connection every time. Does Celery use a broker connection pool, and if so, how do you access it?

wolverdude
  • 1,583
  • 1
  • 13
  • 20

2 Answers2

15

It took a lot of searching because Celery's documentation is... wonderful... but I found the answer.

Celery does use a broker connection pool for calling subtasks. The celery application has a pool attribute that you can access through <your_app>.pool or celery.current_app.pool. You can then grab a connection from the pool using pool.acquire().

Community
  • 1
  • 1
wolverdude
  • 1,583
  • 1
  • 13
  • 20
  • I am in similar situation. Is the following code correct - http://dpaste.com/2SSJ8W0 ? I appreciate any help. Thanks! – avi Mar 02 '16 at 08:38
0

Also, it's possible by using Bootsteps https://docs.celeryproject.org/en/stable/userguide/extending.html

Let me copy-paste code from documentation (e.g. prevent 404 error in future)

from celery import Celery
from celery import bootsteps
from kombu import Consumer, Exchange, Queue

my_queue = Queue('custom', Exchange('custom'), 'routing_key')

app = Celery(broker='amqp://')


class MyConsumerStep(bootsteps.ConsumerStep):

    def get_consumers(self, channel):
        return [Consumer(channel,
                         queues=[my_queue],
                         callbacks=[self.handle_message],
                         accept=['json'])]

    def handle_message(self, body, message):
        print('Received message: {0!r}'.format(body))
        message.ack()
app.steps['consumer'].add(MyConsumerStep)

def send_me_a_message(who, producer=None):
    with app.producer_or_acquire(producer) as producer:
        producer.publish(
            {'hello': who},
            serializer='json',
            exchange=my_queue.exchange,
            routing_key='routing_key',
            declare=[my_queue],
            retry=True,
        )

if __name__ == '__main__':
    send_me_a_message('world!')
ColCh
  • 3,016
  • 3
  • 20
  • 20