I'm using MWAA on AWS for a workflow orchestration project. I have a DAG folder structure like this (and gets uploaded into S3 like this, with the MWAA Dags folder set to src/dags
):
src/
├─ dags/
│ ├─ first_dag/
│ │ ├─ tasks/
│ │ │ ├─ task_01/
│ │ │ │ ├─ task.py
│ │ │ │ ├─ __init__.py
│ │ │ ├─ task_02/
│ │ │ │ ├─ task.py
│ │ │ │ ├─ __init__.py
│ │ │ ├─ __init__.py
│ │ ├─ dag.py
│ │ ├─ __init__.py
│ ├─ __init__.py
src/dags/first_dag/dag.py
looks something like this:
from airflow.models import DAG
from first_dag.tasks.task_01.task import create_task as create_task_01
from first_dag.tasks.task_02.task import create_task as create_task_02
default_args = {
'owner': 'Me'
}
with DAG(dag_id='test', schedule_interval=None, default_args=default_args) as dag:
task_01 = create_task_01()
task_02 = create_task_02()
task_01 >> task_02
and src/dags/first_dag/tasks/task_01/task.py
and src/dags/first_dag/tasks/task_02/task.py
are basically the same file like this:
from airflow.operators.dummy import DummyOperator
def create_task():
task = DummyOperator(
task_id='dummy_task'
)
return task
My understanding from the official Airflow docs is that the DAGs folder is automatically added to the PYTHONPATH
env variable. Yet, the MWAA UI shows a DAG import error:
Broken DAG: [/usr/local/airflow/dags/first_dag/dag.py] Traceback (most recent call last):
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/usr/local/airflow/dags/first_dag/dag.py", line 2, in <module>
from first_dag.tasks.task_01.task import create_task as create_task_01
ModuleNotFoundError: No module named 'first_dag.tasks'
I'm not really understanding why Python can't find the module - does MWAA do something different with PYTHONPATH
? Am I importing incorrectly? How can I resolve?
Edit: I also tried adding the first_dag
directory to a plugins.zip
archive (removing the dag.py
file) to see if it would recognize it as a plugin - still giving the ModuleNotFound
error.