1

I have a file that defines the DAG object:

dags/my_dag.py

from airflow import DAG
from datetime import datetime

default_args = {
    'owner': 'pilota',
    'depends_on_past': False,
    'start_date': datetime(2019, 10, 1),
    'email': ['some@email.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
}

bts_dag = DAG(
    'hist_data_etl', default_args=default_args, schedule_interval='@once')

Then in another file, I import the created dag and define my tasks:

from ingestion.airflow_home.dags.my_dag import bts_dag
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from ingestion.datatransformer import fetch_and_transform_bts_data_col

NUM_ENGINES = 4

template_command = '''
    ipcluster start n {{ params.cluster }}
    sleep 5
'''

start_iparallel_cluster = BashOperator(
    task_id='start_cluster',
    bash_command=template_command,
    retries=3,
    params={'params': NUM_ENGINES},
    dag=bts_dag)


import_hist_bts_data_task = PythonOperator(
    task_id='fetch_transform_hist_col',
    python_callable=fetch_and_transform_bts_data_col,
    op_kwargs={
        'bucket': 'some-bucket', 'path': 'hello/', 'num_files': 1
    },
    dag=bts_dag)

start_iparallel_cluster >> import_hist_bts_data_task

sanity check:

$ airflow list_dags

yields:

 -------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
hist_data_etl

however

$ airflow list_tasks hist_data_etl

doesn't output any of my tasks. Somehow airflow isn't registering the tasks to belong to the DAG I defined in another file.

Please help :)

Cyzanfar
  • 6,997
  • 9
  • 43
  • 81
  • 1
    did you try putting both *`DAG` object creation* and *`task`-creation* (`operator`(s) instantiation) in a single file? – y2k-shubham Oct 02 '19 at 04:19
  • I did. And that works fine.I want to be able to build and execute these operators in another file in a parent folder – Cyzanfar Oct 02 '19 at 04:35
  • The reason why is that the function I defined and that are being used by the PythonOperator are located in another directory. I tried importing them into days/my_dag.py but I’m having import issues because those function are in another parent folder – Cyzanfar Oct 02 '19 at 04:39
  • 1
    For resolving import errors in Airflow, read [this](https://stackoverflow.com/a/57693865/3679900) – y2k-shubham Oct 02 '19 at 04:45
  • that's helpful, thank you. What about the other way around -> importing the defined dag into another module – Cyzanfar Oct 02 '19 at 05:31

1 Answers1

2
  • Because of the way dag-file parsing works in Airflow, I don't expect this to work
  • Even I don't have complete idea of internals, but Airflow spawns child processes to parse dag-definition files (files identified by some traits). Each process parses a different subset of files => it is likely that different files are processed by different processes
  • I believe that in your implementation, the logical order of parsing the files (parse dag file first and then task file) is not preserved and therefore things don't work

However with some modification in your approach, you can get this working

first file

# dag_object_creator.py

from airflow import DAG
from datetime import datetime

default_args = {
    'owner': 'pilota',
    'depends_on_past': False,
    'start_date': datetime(2019, 10, 1),
    'email': ['some@email.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
}

def create_dag_object():
    bts_dag = DAG(dag_id='hist_data_etl',
                  default_args=default_args,
                  schedule_interval='@once')
    return bts_dag

second file

# tasks_creator.py

# this import statement is problematic
# from ingestion.airflow_home.dags.my_dag import bts_dag
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from ingestion.datatransformer import fetch_and_transform_bts_data_col

NUM_ENGINES = 4

template_command = '''
    ipcluster start n {{ params.cluster }}
    sleep 5
'''

def create_bash_task(bts_dag):
    start_iparallel_cluster = BashOperator(
        task_id='start_cluster',
        bash_command=template_command,
        retries=3,
        params={'params': NUM_ENGINES},
        dag=bts_dag)
    return start_iparallel_cluster


def create_python_task(bts_dag):
    import_hist_bts_data_task = PythonOperator(
        task_id='fetch_transform_hist_col',
        python_callable=fetch_and_transform_bts_data_col,
        op_kwargs={
            'bucket': 'pilota-ml-raw-store', 'path': 'flights/', 'num_files': 1
        },
        dag=bts_dag)
    return import_hist_bts_data_task

third file

# dag_definition_file.py

import dag_object_creator
import tasks_creator

# create dag object
# stuff from 'dag_object_creator.py' can be put here directly,
# i just broke down things for clarity
bts_dag = dag_object_creator.create_dag_object()

# create tasks
start_iparallel_cluster = tasks_creator.create_bash_task(bts_dag)
import_hist_bts_data_task = tasks_creator.create_python_task(bts_dag)

# chaining tasks
start_iparallel_cluster >> import_hist_bts_data_task

above layout of code would enforce following behaviour

  • upfront process starts parsing only dag_definition_file.py (other two files are skipped because no "DAG"s are created at global scope)

  • as and when import statements are executed, these files are parsed

  • when dag / task creation statements are executed, DAG & task objects respectively get created in global scope

therefore everything comes into place nicely and this implementation should work (not tested, but based on anecdotal knowledge)


Suggested reads

y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
  • I moved the definition of task and dag into the `mydag.py` file but I'm getting import issues when trying to import a file from another parent directory. – Cyzanfar Oct 02 '19 at 15:17