I would like to extract all execution time of a particular task in an Airflow Dag. I would prefer to do it by writing another Dag.
I have used the following Dag to extract the status and execution time of another Dag
import pandas as pd
import numpy as np
import pandas_gbq
from google.cloud import storage as gcs
from google.cloud import bigquery
dag_id = 'my_dag'
dag_runs = DagRun.find(dag_id=my_dag)
# Declare empty array
arr = []
arr1 = []
for dag_run in dag_runs:
arr.append(dag_run.state)
arr1.append(dag_run.execution_date)
dag_info = {'time': arr1, 'dag_status': arr}
df = pd.DataFrame(dag_info)
## Keep failed and successful dag runs
df_status = df[(df.dag_status == "failed") | (df.dag_status == "success")]
df_status.loc[df_status['dag_status']=='success','flag'] = 0
df_status.loc[df_status['dag_status']=='failed','flag'] = 1
### Code to save the table in Bigquery
return None
I would like to do same but this time to extract a task information for 'my_dag'. I have tried solution given Status of Airflow task within the dag but it returns "None", although I know the task and dag is running.
def task_status_check(**kwargs):
##### TESTING. ####
import pandas as pd
import datetime
my_date = datetime.datetime(2020, 9, 28)
my_dag_id = 'my_dag'
my_task_id = 'my_task'
dag_folder = conf.get('core','DAGS_FOLDER')
dagbag = DagBag(dag_folder)
check_dag = dagbag.dags[my_dag_id]
my_task = check_dag.get_task(my_task_id)
for n in range(1, 500, 2):
time_delta = timedelta(minutes = n)
my_date_1 = my_date + time_delta
ti = TaskInstance(my_task, my_date_1)
print("######################")
print(ti.current_state())
Any help will be highly appreciated.
Thank you