22

I need the status of the task like if it is running or upforretry or failed within the same dag. So i tried to get it using the below code, though i got no output...

Auto = PythonOperator(
    task_id='test_sleep',
    python_callable=execute_on_emr,
    op_kwargs={'cmd':'python /home/hadoop/test/testsleep.py'},
    dag=dag)

logger.info(Auto)

The intention is to kill certain running tasks once a particular task on airflow completes.

Question is how do i get the state of a task like is it in the running state or failed or success

y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
Chetan J
  • 1,847
  • 5
  • 16
  • 21

4 Answers4

21

I am doing something similar. I need to check for one task if the previous 10 runs of another task were successful. taky2 sent me on the right path. It is actually fairly easy:

from airflow.models import TaskInstance
ti = TaskInstance(*your_task*, execution_date)
state = ti.current_state()

As I want to check that within the dag, it is not neccessary to specify the dag. I simply created a function to loop through the past n_days and check the status.

def check_status(**kwargs):
    last_n_days = 10
    for n in range(0,last_n_days):
        date = kwargs['execution_date']- timedelta(n)
        ti = TaskInstance(*my_task*, date) #my_task is the task you defined within the DAG rather than the task_id (as in the example below: check_success_task rather than 'check_success_days_before') 
        state = ti.current_state()
        if state != 'success':
            raise ValueError('Not all previous tasks successfully completed.')

When you call the function make sure to set provide_context.

check_success_task = PythonOperator(
    task_id='check_success_days_before',
    python_callable= check_status,
    provide_context=True,
    dag=dag
)

UPDATE: When you want to call a task from another dag, you need to call it like this:

from airflow import configuration as conf
from airflow.models import DagBag, TaskInstance

dag_folder = conf.get('core','DAGS_FOLDER')
dagbag = DagBag(dag_folder)
check_dag = dagbag.dags[*my_dag_id*]
my_task = check_dag.get_task(*my_task_id*)
ti = TaskInstance(my_task, date)

Apparently there is also an api-call by now doing the same thing:

from airflow.api.common.experimental.get_task_instance import get_task_instance
ti = get_task_instance(*my_dag_id*, *my_task_id*, date)
Krischl
  • 319
  • 2
  • 6
  • You're an Airflow Hero dude thank you for providing such detail and especially thank you for posting that Update with the additional code. You're a life saver! – Kyle Bridenstine Apr 27 '19 at 02:54
  • Thanks @Krischl for this reply. When calling a task from another DAG, how do you determine its `date`? I tried using `check_dag.get_last_dagrun().execution_date`, but `get_last_dagrun` returns None. – A. Debugne Jan 21 '20 at 08:39
  • @A. Debugne I dont see why your approach shouldn't work unless your DAG has never run. You might also want to check `check_dag.latest_execution_date`. If you know your DAG you might also get around manually by changing the datetime-object `date = date.replace(hour=*my_hour*,minute=*my_minute*)` – Krischl Jan 24 '20 at 10:06
  • How do you get the status as a string for use in, say, an email operator, or just to print to logs? – Maile Cupo Mar 01 '22 at 23:37
9

Take a look at the code responsible for the command line interface operation suggested by Priyank.

https://github.com/apache/incubator-airflow/blob/2318cea74d4f71fba353eaca9bb3c4fd3cdb06c0/airflow/bin/cli.py#L581

def task_state(args):
    dag = get_dag(args)
    task = dag.get_task(task_id=args.task_id)
    ti = TaskInstance(task, args.execution_date)
    print(ti.current_state())

Hence, it seem you should easily be able to accomplish this within your DAG codebase using similar code.

Alternatively you could execute these CLI operations from within your code using python's subprocess library.

taky2
  • 825
  • 1
  • 10
  • 15
6

Okay, I think I know what you're doing and I don't really agree with it, but I'll start with an answer.

A straightforward, but hackish, way would be to query the task_instance table. I'm in postgres, but the structure should be the same. Start by grabbing the task_ids and state of the task you're interested in with a db call.

SELECT task_id, state
FROM task_instance
WHERE dag_id = '<dag_id_attrib>'
  AND execution_date = '<execution_date_attrib>'
  AND task_id = '<task_to_check>'

That should give you the state (and name, for reference) of the task you're trying to monitor. State is stored as a simple lowercase string.

apathyman
  • 1,031
  • 10
  • 11
  • I am creating an SSH connection and executing scripts on a remote server. When I "clear" a running task, task stops but the script continues to run on the remote server. Therefore i thought of checking the status of the task to kill the script. Is there a better approach ? – Chetan J May 03 '17 at 05:18
  • I guess I just don't like the idea of having a busy wait taking up a task slot on the worker. This job seems like a single discrete step (spin up, execution, terminate) from the airflow perspective. Are you running multiple jobs on the same cluster while it is up? I don't really have a better solution atm (although I'm also building an EMR operator/hook, so maybe I will). This question, and my answer (regular busy-checking of a database table) just raises a lot of red flags to me. – apathyman May 03 '17 at 21:57
  • On the cluster I will be submitting one job at a time but in a situation where i need to kill/stop a running task, from the UI, I am unable to kill/stop the task that is running on the remote server. Reason being, when the airflow task runs, the task created another process id for the ssh connection and so when i "clear" the task from the Airflow UI, the parent process gets killed where as the SSh connection is still up and the job is executing. I am not sure how to handle the situation. – Chetan J May 04 '17 at 06:24
2

You can use the command line Interface for this:

 airflow task_state [-h] [-sd SUBDIR] dag_id task_id execution_date

For more on this you can refer official airflow documentation:

http://airflow.incubator.apache.org/cli.html

Priyank Mehta
  • 2,453
  • 2
  • 21
  • 32