I have been stuck with this issue for a while now.
I followed steps and was successful to up the celery with tasks and when I run celery alone its working as expected:
-------------- celery@xxxxcxxxxx.zzzz.org v4.3.0 (rhubarb)
---- **** -----
--- * *** * -- Linux-2.6.32-573.22.1.el6.x86_64-x86_64-with-redhat-6.7-Santiago 2019-05-11 11:34:25
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x7fcb942e93d0
- ** ---------- .> transport: amqp://guest:**@xx.xx.xx.xx:5672//
- ** ---------- .> results: redis://localhost:6379/
- *** --- * --- .> concurrency: 5 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.add
. tasks.rehi
However, when I configured the same in airflow.cfg using the below:Checkthis
This is my airflow.cfg:
executor=CeleryExecutor
CELERY_BROKER_URL = 'amqp://guest:guest@xx.xx.xx:5672/'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/'
# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
#sql_alchemy_conn = sqlite:////home/apservices/airflow/airflow.db
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@yy.yy.yy.yy:5432/airflow
However, when I run airflow worker
Its just not picking up the configurations given in the airflow.cfg:
-------------- celery@xxx.xx.org v4.3.0 (rhubarb)
---- **** -----
--- * *** * -- Linux-2.6.32-573.22.1.el6.x86_64-x86_64-with-redhat-6.7-Santiago 2019-05-11 12:00:57
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: airflow.executors.celery_executor:0x7fa16cf99750
- ** ---------- .> transport: sqla+mysql://airflow:airflow@localhost:3306/airflow
- ** ---------- .> results: mysql://airflow:**@localhost:3306/airflow
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> default exchange=default(direct) key=default
Complete Stacktrace:
[2019-05-11 12:00:56,936] {configuration.py:206} WARNING - section/key [celery/celery_ssl_active] not found in config [2019-05-11 12:00:56,936] {default_celery.py:41} WARNING - Celery Executor will run without SSL [2019-05-11 12:00:56,938] {init.py:45} INFO - Using executor CeleryExecutor
-------------- celery@xx.xx.org v4.3.0 (rhubarb) ---- **** ----- --- * * * -- Linux-2.6.32-573.22.1.el6.x86_64-x86_64-with-redhat-6.7-Santiago 2019-05-11 12:00:57 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: airflow.executors.celery_executor:0x7fa16cf99750 - ** ---------- .> transport: sqla+mysql://airflow:airflow@localhost:3306/airflow - ** ---------- .> results: mysql://airflow:@localhost:3306/airflow - *** --- * --- .> concurrency: 16 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> default exchange=default(direct) key=default
[2019-05-11 12:00:57,160: CRITICAL/MainProcess] Unrecoverable error: TypeError("Invalid argument(s) 'visibility_timeout' sent to create_engine(), using configuration MySQLDialect_mysqldb/QueuePool/Engine. Please check that the keyword arguments are appropriate for this combination of components.",) Traceback (most recent call last): File "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/celery/worker/worker.py", line 205, in start self.blueprint.start(self) File "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in start step.start(parent) File "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/celery/bootsteps.py", line 369, in start return self.obj.start() File "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 318, in start blueprint.start(self) File "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in start step.start(parent) File "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/celery/worker/consumer/tasks.py", line 41, in start c.connection, on_decode_error=c.on_decode_error, File "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/celery/app/amqp.py", line 297, in TaskConsumer **kw File "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/kombu/messaging.py", line 386, in init self.revive(self.channel) File "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/kombu/messaging.py", line 408, in revive self.declare() File "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/kombu/messaging.py", line 421, in declare queue.declare() File "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/kombu/entity.py", line 608, in declare self._create_queue(nowait=nowait, channel=channel) File "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/kombu/entity.py", line 617, in _create_queue self.queue_declare(nowait=nowait, passive=False, channel=channel) File "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/kombu/entity.py", line 652, in queue_declare nowait=nowait, File "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/kombu/transport/virtual/base.py", line 531, in queue_declare self._new_queue(queue, **kwargs) File "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/kombu/transport/sqlalchemy/init.py", line 82, in _new_queue self._get_or_create(queue) File "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/kombu/transport/sqlalchemy/init.py", line 70, in _get_or_create obj = self.session.query(self.queue_cls) \ File "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/kombu/transport/sqlalchemy/init.py", line 65, in session _, Session = self._open() File "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/kombu/transport/sqlalchemy/init.py", line 56, in _open engine = self._engine_from_config() File "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/kombu/transport/sqlalchemy/init.py", line 51, in _engine_from_config return create_engine(conninfo.hostname, **transport_options) File "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/sqlalchemy/engine/init.py", line 386, in create_engine return strategy.create(*args, **kwargs) File "/opt/cloudera/parcels/Anaconda-2.5.0/lib/python2.7/site-packages/sqlalchemy/engine/strategies.py", line 144, in create engineclass.name)) TypeError: Invalid argument(s) 'visibility_timeout' sent to create_engine(), using configuration MySQLDialect_mysqldb/QueuePool/Engine. Please check that the keyword arguments are appropriate for this combination of components. [2019-05-11 12:00:57,898] {configuration.py:206} WARNING - section/key [celery/celery_ssl_active] not found in config [2019-05-11 12:00:57,898] {default_celery.py:41} WARNING - Celery Executor will run without SSL [2019-05-11 12:00:57,900] {init.py:45} INFO - Using executor CeleryExecutor Starting flask [2019-05-11 12:00:58,083] {_internal.py:122} INFO - * Running on http://0.0.0.0:8793/ (Press CTRL+C to quit)
I am not worried about the TypeError: Invalid argument(s) 'visibility_timeout'
as of now as I am sure the configurations passed in airflow.conf is just getting picked up.
Let me know what I am missing, any help is highly appreciated!
Cheers!