I am using airflow version 1.10.11
, and have a simple test DAG that looks like this:
from datetime import datetime as dt
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from backend.logic.games import refresh_game_data
dag = DAG(
dag_id='update_game_data',
schedule_interval='@once',
start_date=dt.utcnow()
)
def refresh_game_with_context(**kwargs):
game_id = kwargs['dag_run'].conf['game_id']
refresh_game_data(game_id)
refresh_game_data_task = PythonOperator(
task_id="refresh_game_data",
python_callable=refresh_game_with_context,
dag=dag,
provide_context=True
)
refresh_game_data_task
When I manually call the task via the UI it succeeds without any issue and produces the expected output:
When I call the CLI I get the following output:
root@b324f7099e97:/home/backend# airflow trigger_dag 'update_game_data' --conf '{"game_id":3}'
[2020-07-30 02:46:06,264] {default_celery.py:88} WARNING - You have configured a result_backend of redis://redis, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-07-30 02:46:06,267] {__init__.py:50} INFO - Using executor CeleryExecutor
[2020-07-30 02:46:06,267] {dagbag.py:396} INFO - Filling up the DagBag from /home/backend/airflow/dags/update_game_data.py
/usr/local/lib/python3.7/dist-packages/pymysql/cursors.py:170: Warning: (1300, "Invalid utf8mb4 character string: '800495'")
result = self._query(query)
Created <DagRun update_game_data @ 2020-07-30 02:46:07+00:00: manual__2020-07-30T02:46:07+00:00, externally triggered: True>
Similar result with the local client in python:
In [1]: from airflow.api.client.local_client import Client
...:
...: afc = Client(None, None)
...: res = afc.trigger_dag(dag_id='update_game_data', conf={"game_id": 3})
[2020-07-30 02:46:43,612] {default_celery.py:88} WARNING - You have configured a result_backend of redis://redis, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-07-30 02:46:43,618] {__init__.py:50} INFO - Using executor CeleryExecutor
[2020-07-30 02:46:43,619] {dagbag.py:396} INFO - Filling up the DagBag from /home/backend/airflow/dags/update_game_data.py
/usr/local/lib/python3.7/dist-packages/pymysql/cursors.py:170: Warning: (1300, "Invalid utf8mb4 character string: '800495'")
result = self._query(query)
However in neither case does the task actually get sent to the airflow celery worker, as it does when triggering via the UI. What's strange here is that in both cases the UI shows that a successful DAG run just happened, but with no log data and without marking the task inside the DAG as a success:
I'm sure that this is where the issue lies, but could use a pointer in the right direction. I am using redis as my task backend, RMQ as my broker, and MySQL as my meta DB as described here.