0

I am using airflow version 1.10.11, and have a simple test DAG that looks like this:

from datetime import datetime as dt

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from backend.logic.games import refresh_game_data


dag = DAG(
  dag_id='update_game_data',
  schedule_interval='@once',
  start_date=dt.utcnow()
  )


def refresh_game_with_context(**kwargs):
    game_id = kwargs['dag_run'].conf['game_id']
    refresh_game_data(game_id)


refresh_game_data_task = PythonOperator(
    task_id="refresh_game_data",
    python_callable=refresh_game_with_context,
    dag=dag,
    provide_context=True
)

refresh_game_data_task

When I manually call the task via the UI it succeeds without any issue and produces the expected output: enter image description here

When I call the CLI I get the following output:

root@b324f7099e97:/home/backend# airflow trigger_dag 'update_game_data' --conf '{"game_id":3}'
[2020-07-30 02:46:06,264] {default_celery.py:88} WARNING - You have configured a result_backend of redis://redis, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-07-30 02:46:06,267] {__init__.py:50} INFO - Using executor CeleryExecutor
[2020-07-30 02:46:06,267] {dagbag.py:396} INFO - Filling up the DagBag from /home/backend/airflow/dags/update_game_data.py
/usr/local/lib/python3.7/dist-packages/pymysql/cursors.py:170: Warning: (1300, "Invalid utf8mb4 character string: '800495'")
  result = self._query(query)
Created <DagRun update_game_data @ 2020-07-30 02:46:07+00:00: manual__2020-07-30T02:46:07+00:00, externally triggered: True>

Similar result with the local client in python:

In [1]: from airflow.api.client.local_client import Client 
   ...:  
   ...: afc = Client(None, None) 
   ...: res = afc.trigger_dag(dag_id='update_game_data', conf={"game_id": 3})                                                                        
[2020-07-30 02:46:43,612] {default_celery.py:88} WARNING - You have configured a result_backend of redis://redis, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-07-30 02:46:43,618] {__init__.py:50} INFO - Using executor CeleryExecutor
[2020-07-30 02:46:43,619] {dagbag.py:396} INFO - Filling up the DagBag from /home/backend/airflow/dags/update_game_data.py
/usr/local/lib/python3.7/dist-packages/pymysql/cursors.py:170: Warning: (1300, "Invalid utf8mb4 character string: '800495'")
  result = self._query(query)

However in neither case does the task actually get sent to the airflow celery worker, as it does when triggering via the UI. What's strange here is that in both cases the UI shows that a successful DAG run just happened, but with no log data and without marking the task inside the DAG as a success:

enter image description here

I'm sure that this is where the issue lies, but could use a pointer in the right direction. I am using redis as my task backend, RMQ as my broker, and MySQL as my meta DB as described here.

aaron
  • 6,339
  • 12
  • 54
  • 80

1 Answers1

0

I started with the externally triggered DAG sample and got that working without any issues. When I started swapping in bits and pieces of my own logic, however, it started breaking. I then found this post that warns against using dynamic start dates. Interestingly, while it seems that dynamic start dates can throw off the airflow scheduler, the externally triggered example uses one and it works.

I tried both datetime.now() and datetime.utcnow() and in both cases was thrown the error that the task execution date needed to be greater than the start date. I tried converting the timestamp to a date, but airflow didn't like not having TZ info in the timestamp. What finally fixed it was giving up on the call to utcnow()/now() and setting up my DAG with a fixed start date as follows:

from datetime import datetime as dt
...
dag = DAG(
  dag_id='update_game_data',
  schedule_interval=None,
  start_date=dt(2000, 1, 1)
  )

By fixing the start date and leaving out the TZ information I was able to get this working. Why, exactly, this works and, why the above DAG works via the UI but not via external triggering is a question I'll leave to the airflow ninjas among us. This is fine for one-off, externally triggered DAGs. We'll see if I run into the same issue if/when our application starts using airflow as a beat scheduler. For now we're using celery in conjunction with redis for managing job schedules.

aaron
  • 6,339
  • 12
  • 54
  • 80