3

I have a DAG without a schedule (it is run manually as needed). It has many tasks. Sometimes I want to 'skip' some initial tasks by changing the task state to SUCCESS manually. Changing task state of a manually executed DAG fails, seemingly because of a bug in parsing the execution_date.

Is there another way to individually setting task states for a manually executed DAG?

Example run below. The execution date of the Task is 01-13T17:27:13.130427, and I believe the milliseconds are not being parsed correctly.

Traceback

Traceback (most recent call last): File "/opt/conda/envs/jumpman_prod/lib/python3.6/site-packages/airflow/www/views.py", line 2372, in set_task_instance_state execution_date = datetime.strptime(execution_date, '%Y-%m-%d %H:%M:%S') File "/opt/conda/envs/jumpman_prod/lib/python3.6/_strptime.py", line 565, in _strptime_datetime tt, fraction = _strptime(data_string, format) File "/opt/conda/envs/jumpman_prod/lib/python3.6/_strptime.py", line 365, in _strptime data_string[found.end():]) ValueError: unconverted data remains: ..130427

stefanobaghino
  • 11,253
  • 4
  • 35
  • 63
kri
  • 83
  • 2
  • 6

2 Answers2

1

What you may want to do to accomplish this is using branching, which, as the name suggests, allows you to follow different execution paths according to some conditions, just like an if in any programming language.

You can use the BranchPythonOperator (documented here) to attain this goal: the idea is that this operator is configured by a python_callable, a function that outputs the task_id to execute next (which should, of course, be a task which is directly downstream from the BranchPythonOperator itself).

Using branching will set the skipped tasks to the proper state automatically, as mentioned in the documentation:

All other “branches” or directly downstream tasks are marked with a state of skipped so that these paths can’t move forward. The skipped states are propagated downstream to allow for the DAG state to fill up and the DAG run’s state to be inferred.

The resulting DAG would look something like the following:

branching
(source: apache.org)

Branching is documented here, on the official Apache Airflow documentation.

Glorfindel
  • 21,988
  • 13
  • 81
  • 109
stefanobaghino
  • 11,253
  • 4
  • 35
  • 63
1

It's not working from Task Instances page, but you can do it in another page:
- open DAG graph view
- select needed Run (screen 1) and click go
- select needed task
- in a popup window click Mark success (screen 2)
- then confirm.

PS it relates to airflow 1.9 version

Screen 1 screen1

Screen 2 screen 2

mastak
  • 343
  • 3
  • 11