I am trying to set up celery broker with cloud AMQP.
Since cloud AMQP service only provide Java SDK, so I rewrote the encryption code with Python, and connection works fine.
However, there is a problem producer sending task: connection with cloud AMQP service will be invalid after my project started for a while, because Celery amqp pruducer/connection can not refresh connection param. Error is 530 Time Expired
.which means password invalid
Here's my celery config:
task_ignore_result=True,
task_default_queue='default',
task_default_exchange='default',
result_exchange='default',
task_default_exchange_type='direct',
broker_login_method='PLAIN',
task_create_missing_queues=True,
task_serializer='json',
result_serializer='json',
result_expire=1,
accept_content=['json'],
broker_connection_retry=False,
task_queues=(
Queue(name='tesu', exchange=Exchange(name='test', type='direct'), routing_key='test'),
),
task_routes=(
{'tasks.add': {
'queue': 'test_lukou',
'routing_key': 'test_lukou'
}},
),
broker_url='amqp://{username}:{password}@{host}:{port}/{virtual_host}'.format(username=provider.get_user_name(),
password=provider.get_password(),
host=PUBLIC_HOST,
port=PORT,
virtual_host=VHOST_NAME),
broker_pool_limit=0,
broker_heartbeat=10,
broker_connection_timeout=30,
result_backend=None,
event_queue_expires=60,
worker_prefetch_multiplier=1,
I updated broker_url when sending task, but amqp connection param is NOT updated.
enviroment:
Python 2.7
kombu 4.0.2
celery 4.1.0
rabbitmq 0.2.0
Does Celery provide a way updating amqp connection param on the runtime?
Can anyone gives me advice? Thanks in advance..
some links:
Celery creating a new connection for each task
https://www.cloudamqp.com/docs/celery.html
addtion:
DEBUG result
amqp connection password(never changed)
celery updated conf