I'm using Celery with an AMQP broker to call tasks, but the response needs to be passed back with a different queue architecture than Celery uses, so I want to pass the messages back using Kombu only. I've been able to do this, but I'm creating a new connection every time. Does Celery use a broker connection pool, and if so, how do you access it?
Asked
Active
Viewed 3,099 times
2 Answers
15
It took a lot of searching because Celery's documentation is... wonderful... but I found the answer.
Celery does use a broker connection pool for calling subtasks. The celery application has a pool
attribute that you can access through <your_app>.pool
or celery.current_app.pool
. You can then grab a connection from the pool using pool.acquire()
.

Community
- 1
- 1

wolverdude
- 1,583
- 1
- 13
- 20
-
I am in similar situation. Is the following code correct - http://dpaste.com/2SSJ8W0 ? I appreciate any help. Thanks! – avi Mar 02 '16 at 08:38
0
Also, it's possible by using Bootsteps https://docs.celeryproject.org/en/stable/userguide/extending.html
Let me copy-paste code from documentation (e.g. prevent 404 error in future)
from celery import Celery
from celery import bootsteps
from kombu import Consumer, Exchange, Queue
my_queue = Queue('custom', Exchange('custom'), 'routing_key')
app = Celery(broker='amqp://')
class MyConsumerStep(bootsteps.ConsumerStep):
def get_consumers(self, channel):
return [Consumer(channel,
queues=[my_queue],
callbacks=[self.handle_message],
accept=['json'])]
def handle_message(self, body, message):
print('Received message: {0!r}'.format(body))
message.ack()
app.steps['consumer'].add(MyConsumerStep)
def send_me_a_message(who, producer=None):
with app.producer_or_acquire(producer) as producer:
producer.publish(
{'hello': who},
serializer='json',
exchange=my_queue.exchange,
routing_key='routing_key',
declare=[my_queue],
retry=True,
)
if __name__ == '__main__':
send_me_a_message('world!')

ColCh
- 3,016
- 3
- 20
- 20