1

I'm using MWAA on AWS for a workflow orchestration project. I have a DAG folder structure like this (and gets uploaded into S3 like this, with the MWAA Dags folder set to src/dags):


src/
├─ dags/
│  ├─ first_dag/
│  │  ├─ tasks/
│  │  │  ├─ task_01/
│  │  │  │  ├─ task.py
│  │  │  │  ├─ __init__.py
│  │  │  ├─ task_02/
│  │  │  │  ├─ task.py
│  │  │  │  ├─ __init__.py
│  │  │  ├─ __init__.py
│  │  ├─ dag.py
│  │  ├─ __init__.py
│  ├─ __init__.py

src/dags/first_dag/dag.py looks something like this:

from airflow.models import DAG
from first_dag.tasks.task_01.task import create_task as create_task_01
from first_dag.tasks.task_02.task import create_task as create_task_02

default_args = {
    'owner': 'Me'
}

with DAG(dag_id='test', schedule_interval=None, default_args=default_args) as dag:
    task_01 = create_task_01()
    task_02 = create_task_02()

    task_01 >> task_02

and src/dags/first_dag/tasks/task_01/task.py and src/dags/first_dag/tasks/task_02/task.py are basically the same file like this:

from airflow.operators.dummy import DummyOperator


def create_task():
    task = DummyOperator(
        task_id='dummy_task'
    )
    return task

My understanding from the official Airflow docs is that the DAGs folder is automatically added to the PYTHONPATH env variable. Yet, the MWAA UI shows a DAG import error:

Broken DAG: [/usr/local/airflow/dags/first_dag/dag.py] Traceback (most recent call last):
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/usr/local/airflow/dags/first_dag/dag.py", line 2, in <module>
    from first_dag.tasks.task_01.task import create_task as create_task_01
ModuleNotFoundError: No module named 'first_dag.tasks'

I'm not really understanding why Python can't find the module - does MWAA do something different with PYTHONPATH? Am I importing incorrectly? How can I resolve?

Edit: I also tried adding the first_dag directory to a plugins.zip archive (removing the dag.py file) to see if it would recognize it as a plugin - still giving the ModuleNotFound error.

jsxgd
  • 403
  • 1
  • 5
  • 16

1 Answers1

1

Here's a similar Stack Overflow question: Set PYTHONPATH in MWAA

does MWAA do something different with PYTHONPATH?

No, it doesn't. However, it's never explicitly set by the user.

Am I importing incorrectly? How can I resolve?

dag.py is in the first_dag sub-package, so it can be excluded from the import statement.

from tasks.task_01.task import create_task as create_task_01
from tasks.task_02.task import create_task as create_task_02

This is explained more in this Stack Overflow question, which is also directly referenced in the Airflow documentation.

Andrew Nguonly
  • 2,258
  • 1
  • 17
  • 23