1

I want to get the status of a task from an external DAG. I have the same tasks running in 2 different DAGs based on some conditions. So, I want to check the status of this task in DAG2 from DAG1. If the task status is 'running' in DAG2, then I will skip this task in DAG1.

I tried using:

dag_runs = DagRun.find(dag_id=dag_id,execution_date=exec_dt)
for dag_run in dag_runs:
  dag_run.state

I couldn't figure out if we can get task status using DagRun. If I use TaskDependencySensor, the DAG will have to wait until it finds the allowed_states of the task.

Is there a way to get the current status of a task in another DAG?

sks
  • 145
  • 1
  • 1
  • 9
  • Have you tried using [ExternalTaskSensor](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/external_task/index.html#module-airflow.sensors.external_task) ? Check [this answer](https://stackoverflow.com/a/68409417/10569220) for a use case similiar to yours. – NicoE Nov 10 '21 at 15:05

1 Answers1

0

I used below code to get the status of a task from another DAG:

from airflow.api.common.experimental.get_task_instance import get_task_instance

def get_dag_state(execution_date, **kwargs):
    ti = get_task_instance('dag_id', 'task_id', execution_date)
    task_status = ti.current_state()
    return task_status

dag_status = BranchPythonOperator(
    task_id='dag_status',
    python_callable=get_dag_state,
    dag=dag
    )

More details can be found here

sks
  • 145
  • 1
  • 1
  • 9