Hello all i am working with airflow here is scenario i am trying to resolve i want to create DAG dynamically after the function run
try:
import os
import sys
from datetime import timedelta,datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.task_group import TaskGroup
import pandas as pd
print("All Dag modules are ok ......")
except Exception as e:
print("Error {} ".format(e))
# ===============================================
default_args = {
"owner": "airflow",
"start_date": datetime(2021, 1, 1),
"retries": 1,
"retry_delay": timedelta(minutes=1),
'email': ['shahsoumil519@gmail.com'],
'email_on_failure': True,
'email_on_retry': False,
}
dag = DAG(dag_id="project", schedule_interval="@once", default_args=default_args, catchup=False)
# ================================================
class XcomHelper(object):
def __init__(self, **context):
self.context = context
def get(self, key=None):
""" Get the Value from XCOM"""
try:
return self.context.get("ti").xcom_pull(key=key)
except Exception as e: return "Error"
def push(self, key=None, value=None):
"""Push the value on session """
try:
self.context['ti'].xcom_push(key=key, value=value)
return True
except Exception as e: return False
def create_dag(dag_id,schedule,dag_number,default_args):
def hello_world_py():
print('Hello World')
dag = DAG(dag_id, schedule_interval=schedule, default_args=default_args)
with dag:
t1 = PythonOperator(task_id=dag_id,python_callable=hello_world_py)
return dag
def simple_task(**context):
DATA = ["soumil", "Shah"]
for n in range(1, len(DATA)):
try:
dag_id = 'hello_world_{}'.format(str(n))
print("DAG ID : {} ".format(dag_id))
default_args = {'owner': 'airflow','start_date': datetime(2018, 1, 1)}
schedule = '@daily'
dag_number = n
globals()[dag_id] = create_dag(dag_id,schedule, dag_number,default_args)
except Exception as e:
print("Error : {} ".format(e))
with DAG(dag_id="project", schedule_interval="@once", default_args=default_args, catchup=False) as dag:
simple_task = PythonOperator(task_id="simple_task",
python_callable=simple_task,
provide_context=True)
simple_task
I want to create these dags based on len of DATA variable that data comes from the database
i tried looking into
- https://www.astronomer.io/guides/dynamically-generating-dags
- Can an Airflow task dynamically generate a DAG at runtime?
- https://medium.com/@flavio.mtps/making-use-of-python-globals-to-dynamically-create-airflow-dags-124e556b704e
any help would be great
Revised Code :
try:
import os
import sys
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
# from airflow.operators.email_operator import EmailOperator
# from airflow.utils.trigger_rule import TriggerRule
# from airflow.utils.task_group import TaskGroup
# import pandas as pd
print("All Dag modules are ok ......")
except Exception as e:
print("Error {} ".format(e))
def create_dag(dag_id, schedule, dag_number, default_args):
def hello_world_py():
print('Hello World')
dag = DAG(dag_id, schedule_interval=schedule, default_args=default_args)
with dag:
t1 = PythonOperator(task_id=dag_id, python_callable=hello_world_py)
return dag
def simple_task():
DATA = ["soumil", "Shah", "Shah2"]
for n in range(0, len(DATA)):
try:
dag_id = 'hello_world_{}'.format(str(n))
print("DAG ID : {} ".format(dag_id))
default_args = {'owner': 'airflow', 'start_date': datetime(2018, 1, 1)}
schedule = '@daily'
dag_number = n
globals()[dag_id] = create_dag(dag_id, schedule, dag_number, default_args)
except Exception as e:
print("Error : {} ".format(e))
def trigger_function():
print("HEREE")
simple_task()
with DAG(dag_id="project", schedule_interval="@once", default_args={'owner': 'airflow', 'start_date': datetime(2018, 1, 1)}, catchup=False) as dag:
trigger_function = PythonOperator(task_id="trigger_function",python_callable=trigger_function,provide_context=True,)
trigger_function