0

In am trying to call DAG from another DAG( target_dag from parent_dag).

My parent_dag code is :

from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

def read_metadata(**kwargs):
    asqldb_kv = Variable.get("asql_kv")
    perfom some operations based on asqldb_kv and populate the result_dictionary list
    if len(result_dictionary) > 0:
       my_var = Variable.set("num_runs", len(result_dictionary))
       ti = kwargs['ti']
       ti.xcom_push(key='res', value=result_dictionary)

            
default_args = {
    'start_date': datetime(year=2021, month=6, day=19),
    'provide_context': True
}

with DAG(
    dag_id='parent_dag',
    default_args=default_args,
    schedule_interval='@once',
    description='Test Trigger DAG'
) as dag:

    trigger = TriggerDagRunOperator(

        task_id="test_trigger_dagrun",
        python_callable=read_metadata,
        trigger_dag_id="target_dag"
    )

I am getting the below error :

airflow.exceptions.AirflowException: Invalid arguments were passed to TriggerDagRunOperator (task_id: test_trigger_dagrun). Invalid arguments were:
**kwargs: {'python_callable': <function read_metadata at 0x7ff5f4159620>}

Any help appreciated.

Edit :

python_callable is depreciated in TriggerDagRunOperator - Airflow 2.0.

My requirement is :

I need to access Azure Synapse and get a variable (Say 3). Based on retrieved variable, I need to create tasks dynamically. Say, if Synapse has 3 , then I need to create 3 tasks.

My idea was :

DAG 1 - Access Azure synapse and get Variable. Update this to Airflow Variable. Trigger DAG2 using TriggerDagRunOperator.

DAG 2 - Create tasks depending on the Airflow Variable updated in DAG 1.

Any inputs how can I achieve this?

usr_lal123
  • 650
  • 12
  • 28
  • The documentation page I see does not show a `python_callable` parameter for `TriggerDagRunOperator`. Where did you see that? – Tim Roberts Jun 19 '21 at 03:49
  • https://www.waitingforcode.com/apache-airflow/externally-triggered-dags-apache-airflow/read – usr_lal123 Jun 19 '21 at 03:50
  • https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/operators/dagrun_operator/index.html#airflow.operators.dagrun_operator.TriggerDagRunOperator – usr_lal123 Jun 19 '21 at 03:57
  • Ah, but that's `dagrun_operator.TriggerDagRunOperator`. You're using `trigger_dagrun.TriggerDagRunOperator`. https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/trigger_dagrun/index.html – Tim Roberts Jun 19 '21 at 04:00
  • Shame on them for having two different functions with the same name. – Tim Roberts Jun 19 '21 at 04:00
  • Well the documentation links reference vastly different Airflow versions with 1.10+ and 2.0+. Additionally, `airflow.operators.dagrun_operator.TriggerDagRunOperator` is deprecated. 1.10.X reached end-of-life on June 17. There was substantial module reorganization done with the 2.0.0 release – Josh Fell Jun 19 '21 at 04:09
  • I am updating my question/requirement. Thanks. – usr_lal123 Jun 19 '21 at 04:10
  • @Madhanlal I think you can achieve what you want without even using a second DAG. [Check this example](https://stackoverflow.com/a/66907844/10569220). – NicoE Jun 19 '21 at 13:56

0 Answers0