0

Hello all i am working with airflow here is scenario i am trying to resolve i want to create DAG dynamically after the function run

try:
    import os
    import sys

    from datetime import timedelta,datetime
    from airflow import DAG

    from airflow.operators.python_operator import PythonOperator
    from airflow.operators.email_operator import EmailOperator
    from airflow.utils.trigger_rule import TriggerRule
    from airflow.utils.task_group import TaskGroup
    import pandas as pd

    print("All Dag modules are ok ......")

except Exception as e:
    print("Error  {} ".format(e))


# ===============================================
default_args = {
    "owner": "airflow",
    "start_date": datetime(2021, 1, 1),
    "retries": 1,
    "retry_delay": timedelta(minutes=1),
    'email': ['shahsoumil519@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': False,
}
dag = DAG(dag_id="project", schedule_interval="@once", default_args=default_args, catchup=False)
# ================================================


class XcomHelper(object):

    def __init__(self, **context):
        self.context = context

    def get(self, key=None):
        """ Get the Value from XCOM"""
        try:
            return self.context.get("ti").xcom_pull(key=key)
        except Exception as e: return "Error"

    def push(self, key=None, value=None):

        """Push the value on session """
        try:
            self.context['ti'].xcom_push(key=key, value=value)
            return True
        except Exception as e: return False



def create_dag(dag_id,schedule,dag_number,default_args):

    def hello_world_py():
        print('Hello World')

    dag = DAG(dag_id, schedule_interval=schedule, default_args=default_args)

    with dag:
        t1 = PythonOperator(task_id=dag_id,python_callable=hello_world_py)

    return dag


def simple_task(**context):

    DATA = ["soumil", "Shah"]
    
    for n in range(1, len(DATA)):
        try:
            dag_id = 'hello_world_{}'.format(str(n))
            print("DAG ID : {} ".format(dag_id))
            default_args = {'owner': 'airflow','start_date': datetime(2018, 1, 1)}
            schedule = '@daily'
            dag_number = n
            globals()[dag_id] = create_dag(dag_id,schedule, dag_number,default_args)
        except Exception as e:
            print("Error : {} ".format(e))

with DAG(dag_id="project", schedule_interval="@once", default_args=default_args, catchup=False) as dag:

    simple_task = PythonOperator(task_id="simple_task",
                                 python_callable=simple_task,
                                 provide_context=True)


simple_task


I want to create these dags based on len of DATA variable that data comes from the database

i tried looking into

any help would be great

Revised Code :

try:
    import os
    import sys

    from datetime import timedelta, datetime
    from airflow import DAG

    from airflow.operators.python_operator import PythonOperator

    # from airflow.operators.email_operator import EmailOperator
    # from airflow.utils.trigger_rule import TriggerRule
    # from airflow.utils.task_group import TaskGroup
    # import pandas as pd

    print("All Dag modules are ok ......")

except Exception as e:
    print("Error  {} ".format(e))


def create_dag(dag_id, schedule, dag_number, default_args):
    def hello_world_py():
        print('Hello World')

    dag = DAG(dag_id, schedule_interval=schedule, default_args=default_args)

    with dag:
        t1 = PythonOperator(task_id=dag_id, python_callable=hello_world_py)

    return dag


def simple_task():
    DATA = ["soumil", "Shah", "Shah2"]

    for n in range(0, len(DATA)):
        try:
            dag_id = 'hello_world_{}'.format(str(n))
            print("DAG ID : {} ".format(dag_id))
            default_args = {'owner': 'airflow', 'start_date': datetime(2018, 1, 1)}
            schedule = '@daily'
            dag_number = n
            globals()[dag_id] = create_dag(dag_id, schedule, dag_number, default_args)
        except Exception as e:
            print("Error : {} ".format(e))

    def trigger_function():
        print("HEREE")
        simple_task()

    with DAG(dag_id="project", schedule_interval="@once", default_args={'owner': 'airflow', 'start_date': datetime(2018, 1, 1)}, catchup=False) as dag:


        trigger_function = PythonOperator(task_id="trigger_function",python_callable=trigger_function,provide_context=True,)


    trigger_function
Soumil Nitin Shah
  • 634
  • 2
  • 7
  • 18
  • Do you need the dag 'project'? Your current code is creating a dag that generates dags. – jay.cs Mar 31 '21 at 22:15
  • well i want to generate DAG based on the function once i execute that function it will say return 2 item now what i want to do is create 2 DAG if that make sense – Soumil Nitin Shah Mar 31 '21 at 22:19

1 Answers1

0

I removed a few lines from your code to keep the answer to the point. The below code will generate DAGs like hello_world_0, hello_world_1... based on the contents of DATA .

EDIT - I used airflow v1.10.x but the code should work for v2.x

Suggestions:

  1. Make the tasks names different from DAGs names.
  2. dag_number variable is currently not being used. That can be taken off.

The DAGs will look like this -

enter image description here

try:
    import os
    import sys

    from datetime import timedelta, datetime
    from airflow import DAG

    from airflow.operators.python_operator import PythonOperator

    # from airflow.operators.email_operator import EmailOperator
    # from airflow.utils.trigger_rule import TriggerRule
    # from airflow.utils.task_group import TaskGroup
    # import pandas as pd

    print("All Dag modules are ok ......")

except Exception as e:
    print("Error  {} ".format(e))


def create_dag(dag_id, schedule, dag_number, default_args):
    def hello_world_py():
        print('Hello World')

    dag = DAG(dag_id, schedule_interval=schedule, default_args=default_args)

    with dag:
        t1 = PythonOperator(task_id=dag_id, python_callable=hello_world_py)

    return dag


def simple_task():
    DATA = ["soumil", "Shah", "Shah2"]

    for n in range(0, len(DATA)):
        try:
            dag_id = 'hello_world_{}'.format(str(n))
            print("DAG ID : {} ".format(dag_id))
            default_args = {'owner': 'airflow', 'start_date': datetime(2018, 1, 1)}
            schedule = '@daily'
            dag_number = n
            globals()[dag_id] = create_dag(dag_id, schedule, dag_number, default_args)
        except Exception as e:
            print("Error : {} ".format(e))


simple_task()
jay.cs
  • 183
  • 1
  • 7
  • Thanks a lot however i want to run the DAG creation process when i manually press the button on UI – Soumil Nitin Shah Mar 31 '21 at 23:26
  • i have updated the description above i only want to run that when pressed from UI Manually can you help – Soumil Nitin Shah Mar 31 '21 at 23:28
  • A SubDag might be more suitable, with a modified use case. A DAG generating another DAG is generally not recommended and I would say not possible. – jay.cs Apr 01 '21 at 02:09
  • https://stackoverflow.com/questions/62962386/can-an-airflow-task-dynamically-generate-a-dag-at-runtime – jay.cs Apr 01 '21 at 02:16
  • Thanks a lot here is usecase what i am trying to achieve. There is website A which enter information about the Task now that get added on Database now in airflow i want to create Dag based on information in User Entered and generate this dag dynamically problem is when user add a item i want to create a Dag for it now i have to redriect to airflow dashboard which will run the script and make dag what do you think is best way here – Soumil Nitin Shah Apr 01 '21 at 16:31