In am trying to call DAG from another DAG( target_dag from parent_dag).
My parent_dag code is :
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
def read_metadata(**kwargs):
asqldb_kv = Variable.get("asql_kv")
perfom some operations based on asqldb_kv and populate the result_dictionary list
if len(result_dictionary) > 0:
my_var = Variable.set("num_runs", len(result_dictionary))
ti = kwargs['ti']
ti.xcom_push(key='res', value=result_dictionary)
default_args = {
'start_date': datetime(year=2021, month=6, day=19),
'provide_context': True
}
with DAG(
dag_id='parent_dag',
default_args=default_args,
schedule_interval='@once',
description='Test Trigger DAG'
) as dag:
trigger = TriggerDagRunOperator(
task_id="test_trigger_dagrun",
python_callable=read_metadata,
trigger_dag_id="target_dag"
)
I am getting the below error :
airflow.exceptions.AirflowException: Invalid arguments were passed to TriggerDagRunOperator (task_id: test_trigger_dagrun). Invalid arguments were:
**kwargs: {'python_callable': <function read_metadata at 0x7ff5f4159620>}
Any help appreciated.
Edit :
python_callable is depreciated in TriggerDagRunOperator - Airflow 2.0.
My requirement is :
I need to access Azure Synapse and get a variable (Say 3). Based on retrieved variable, I need to create tasks dynamically. Say, if Synapse has 3 , then I need to create 3 tasks.
My idea was :
DAG 1 - Access Azure synapse and get Variable. Update this to Airflow Variable. Trigger DAG2 using TriggerDagRunOperator.
DAG 2 - Create tasks depending on the Airflow Variable updated in DAG 1.
Any inputs how can I achieve this?