0

I am trying to set up celery broker with cloud AMQP.

Since cloud AMQP service only provide Java SDK, so I rewrote the encryption code with Python, and connection works fine.

However, there is a problem producer sending task: connection with cloud AMQP service will be invalid after my project started for a while, because Celery amqp pruducer/connection can not refresh connection param. Error is 530 Time Expired.which means password invalid

Here's my celery config:

task_ignore_result=True,
task_default_queue='default',
task_default_exchange='default',
result_exchange='default',
task_default_exchange_type='direct',
broker_login_method='PLAIN',
task_create_missing_queues=True,
task_serializer='json',
result_serializer='json',
result_expire=1,
accept_content=['json'],
broker_connection_retry=False,
task_queues=(
    Queue(name='tesu', exchange=Exchange(name='test', type='direct'), routing_key='test'),
),
task_routes=(
    {'tasks.add': {
        'queue': 'test_lukou',
        'routing_key': 'test_lukou'
    }},
),
broker_url='amqp://{username}:{password}@{host}:{port}/{virtual_host}'.format(username=provider.get_user_name(),
                                                                              password=provider.get_password(),
                                                                              host=PUBLIC_HOST,
                                                                              port=PORT,
                                                                              virtual_host=VHOST_NAME),
broker_pool_limit=0,
broker_heartbeat=10,
broker_connection_timeout=30, 
result_backend=None, 
event_queue_expires=60,  
worker_prefetch_multiplier=1,

I updated broker_url when sending task, but amqp connection param is NOT updated.

enviroment:
Python 2.7 kombu 4.0.2 celery 4.1.0 rabbitmq 0.2.0

Does Celery provide a way updating amqp connection param on the runtime?
Can anyone gives me advice? Thanks in advance..

some links:

Celery creating a new connection for each task

https://www.cloudamqp.com/docs/celery.html

addtion:
DEBUG result
amqp connection password(never changed)
celery updated conf

Jx Liu
  • 13
  • 4

1 Answers1

0

Sloved

Set Celery amqp by creating new producer pool for every task, like this:

class TestAMQP(AMQP):
    @property
    def producer_pool(self):
        self._producer_pool = pools.producers[
            self.app.connection_for_write()]
        self._producer_pool.limit = self.app.pool.limit
        return self._producer_pool

app = Celery('test', include=['tasks'], amqp=TestAMQP)

Hope this can help someone who stucked in similar problem.

Jx Liu
  • 13
  • 4