0

I would like to extract all execution time of a particular task in an Airflow Dag. I would prefer to do it by writing another Dag.

I have used the following Dag to extract the status and execution time of another Dag


import pandas as pd
import numpy as np
import pandas_gbq
from google.cloud import storage as gcs
from google.cloud import bigquery

dag_id = 'my_dag'
dag_runs = DagRun.find(dag_id=my_dag)
        
# Declare empty array

arr = []
arr1 = []

for dag_run in dag_runs:
    arr.append(dag_run.state)
    arr1.append(dag_run.execution_date)
 

dag_info = {'time': arr1, 'dag_status': arr}

df = pd.DataFrame(dag_info)

## Keep failed and successful dag runs    
df_status = df[(df.dag_status == "failed") | (df.dag_status == "success")] 

df_status.loc[df_status['dag_status']=='success','flag'] = 0
df_status.loc[df_status['dag_status']=='failed','flag'] = 1

### Code to save the table in Bigquery


return None

I would like to do same but this time to extract a task information for 'my_dag'. I have tried solution given Status of Airflow task within the dag but it returns "None", although I know the task and dag is running.

def task_status_check(**kwargs):

        ##### TESTING. ####

        import pandas as pd
        import datetime

        my_date = datetime.datetime(2020, 9, 28)

        my_dag_id = 'my_dag'
        my_task_id = 'my_task'


        dag_folder = conf.get('core','DAGS_FOLDER')
        dagbag = DagBag(dag_folder)
        check_dag = dagbag.dags[my_dag_id]
        my_task = check_dag.get_task(my_task_id)

        for n in range(1, 500, 2):

            time_delta = timedelta(minutes = n)
            my_date_1 = my_date + time_delta 
            ti = TaskInstance(my_task, my_date_1)

            print("######################")
            print(ti.current_state())

Any help will be highly appreciated.

Thank you

Jofre
  • 3,718
  • 1
  • 23
  • 31
moeen
  • 33
  • 6
  • What does `TaskInstance(my_task, my_date_1)` return? Can you check this value for some of iterations in the loop `print(ti)`? – Nick_Kh Sep 29 '20 at 16:19
  • Can you also provide Composer image version? – Nick_Kh Sep 29 '20 at 16:44
  • @Nick_Kh Thanks for the reply. I did checked the print(ti). Here is one example output So it makes sense that print(ti.current_state()) is none because it's [None]. I was wondering if it's possible to get the task information without mentioning time as I did for the Dag. The composer version is "composer-1.7.3-airflow-1.10.2" Thanks – moeen Sep 29 '20 at 16:56
  • ```composer-1.7.3-airflow-1.10.2``` – moeen Sep 29 '20 at 17:03

1 Answers1

1

I suspect the issue here in TaskInstance() model but not the custom code logic enclosed in task_status_check() function. Basically TaskInstance() class offers a variety of Airflow tasks managing features leveraging SQLAlchemy OMR Python tool which performs the query against entire Airflow metadata DB fetching the records from task_instance SQL table, looking through the source code you might get #L203 that reflects this.

I've tried your code in the very common similar scenario and faced with the same None returned state. Reviewing the user's efforts mentioned in the initial question Stack thread and getting deeper into the problem I've adjusted get_task_instance() to check the behavior, pointing this function to extract the state for the particular Airflow task. As long as get_task_instance() is an experimental package, seemingly it is invoking TaskInstance() class to discover the task state:

def task_check(**kwargs):
  import datetime
  from datetime import timezone
  from airflow import configuration as conf
  import logging
  from airflow.api.common.experimental.get_task_instance import get_task_instance
  
  my_date = datetime.datetime('yyyy', 'mm', 'dd', 'hour', 'min', 'sec')
  my_date = my_date.replace(tzinfo=timezone.utc) 

  my_dag_id = "Dag_id"
  my_task_id = "Task_id"
  ti = get_task_instance(my_dag_id,my_task_id,my_date)

I've checked that the request to Airflow DB was successful, however get_task_instance function returns the same None state:

{python_operator.py:114} INFO - Done. Returned value was: None

Meanwhile, doing the further research, considering the other methods for extracting the state for Airflow tasks and they just kept this job fine.

  • Airflow command-line executor, adjusted to run on one of the Composer workers:

    kubectl -it exec $(kubectl get po -l run=airflow-worker -o jsonpath='{.items[0].metadata.name}' \
        -n $(kubectl get ns| grep composer*| awk '{print $1}')) -n $(kubectl get ns| grep composer*| awk '{print $1}') \
        -c airflow-worker airflow task_state <Dag_ID> <Task_ID> 2020-09-27T23:59:21+00:00
    
  • Querying the metadata MySQL task_instance accordingly:

   SELECT task_id, state, execution_date
   FROM task_instance
   WHERE dag_id = 'dag_id'
   AND DATE(execution_date) = 'execution_date'
   AND task_id = 'task_id'
Jofre
  • 3,718
  • 1
  • 23
  • 31
Nick_Kh
  • 5,089
  • 2
  • 10
  • 16
  • Nick_Kh, Thanks for the deep dive in the issue. I will try the other methods you mentioned that is working. I exactly do not know where I can execute the SQL query on GCP Composer. I would appreciate your input. Thanks again – moeen Sep 30 '20 at 19:30
  • I believe that I've already explained the process of connecting to MySQL Ailrflow metadata schema in [this](https://stackoverflow.com/a/63882534/9928809) answer, hope it will help you to overcome the issue. If you find my answer was effortful to give more details to the problem evidence then consider [accept/up vote](https://stackoverflow.com/help/why-vote) it, helping the other contributors with their research. – Nick_Kh Oct 01 '20 at 09:51
  • Hi @Nick_Kh, thanks for pointing me to related post. I will look into it. As I still could not come up with a solution hence I up voted your effort. Thanks Again – moeen Oct 02 '20 at 05:41