1

I am new to airflow and Im trying to run a dag that references a custom operator (my_operators.py) in Airflow v1.10.14

Issue: Im getting the following error in the airflow UI:

Broken DAG: [/opt/airflow/dags/test_operator.py] No module named 'operators.my_operators'

Directory structure:

airflow
|-- dags
     |-- test_operator.py
     |-- requirements.txt
     |-- __init__.py
|-- plugins
     |--__init__.py
     |-- operators
           |-- my_operators.py
           |-- __init__.py
     |-- airflow.cfg

I am able to successfully reference and import when the operator file (my_operators.py) is directly in the "plugins" folder using

from my_operators import MyFirstOperator

or when it is under the "dags/operators/" directory using

from operators.my_operators import MyFirstOperator

But not when its in the "plugins/operators/" directory. Seems like it cannot detect the "operators" folder in "plugins" directory but does in "dags" directory. What am I doing wrong?

Additional Context:

Dag file content:

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from operators.my_operators import MyFirstOperator
 
 
dag = DAG('my_test_dag', description='Another tutorial DAG',
          schedule_interval='0 12 * * *',
          start_date=datetime(2019, 5, 29), catchup=False)
 
dummy_task = DummyOperator(task_id='dummy_task', dag=dag)
 
operator_task = MyFirstOperator(my_operator_param='This is a test.',
                                task_id='my_first_operator_task', dag=dag)
 
dummy_task >> operator_task

Custom operator file content:

import logging
 
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
 
log = logging.getLogger(__name__)
 
class MyFirstOperator(BaseOperator):
 
    @apply_defaults
    def __init__(self, my_operator_param, *args, **kwargs):
        self.operator_param = my_operator_param
        super(MyFirstOperator, self).__init__(*args, **kwargs)
 
    def execute(self, context):
        log.info("Hello World!")
        log.info('operator_param: %s', self.operator_param)

requirements.txt content:

flask-bcrypt==0.7.1
apache-airflow==1.10.14

All "init.py" files are empty

I tried following along with the answer provided in the following post with no success: Can't import Airflow plugins

jorgeavelar98
  • 75
  • 1
  • 9

2 Answers2

2

I think you're confused on the {AIRFLOW_HOME}/plugins directory.

Plugins don't function like it would do if you placed your custom operator in {AIRFLOW_HOME}/dags or {AIRFLOW_HOME}/data.

When you place custom code in either of these two directories, you can declare any arbitrary Python code that can be shared between DAGs. This could be an operator, a default default_args dictionary that you might want multiple DAGs to share etc.

The documentation for Airflow 1 for this is here (in Airflow 2 the documentation has been changed to make it much clearer how Airflow uses these directories when you want to add custom code).

Your plugin needs to define the AirflowPlugin class. When you implement this class your Operator will be integrated into Airflow - it's import path will be (assuming you define the plugin as my_custom_plugin in AirflowPlugin:

from airflow.operators.my_custom_plugin import MyFirstOperator

You cannot declare arbitrary Python code to share between DAGs when using plugins - it has to implement this class and implement all the required methods for your custom Airflow plugin (whether it's a Hook, Sensor, Operator etc).

Check out the documentation for Plugins in Airflow 1 here - this example shows you exactly what you need to implement.

It's up to you whether you want to go to the trouble of implementing a Plugin. This functionality is used if you are going to write an Operator that you want to share and publish for people to use. If the Operator is just for internal use in the overwhelming majority of cases (at least I've seen) people just use {AIRFLOW_HOME}/dags or {AIRFLOW_HOME}/data.

Daniel T
  • 531
  • 2
  • 5
  • But if I am trying to use my custom operator as a python module, is it possible to place it under a "plugins/operators" directory and reference it from there or does it have to go under the "dags" folder? Note: I am not trying to use the custom operator as a plugin but rather just a python module – jorgeavelar98 Jun 12 '22 at 19:30
  • Yes the as long as the plugins directory is in sys.path, you can store your custom operators in there. Bear in mind that the plugins directory is intended for actual Airflow plugins. Is there a specific reason it has to go in this directory? The documentation suggests using dags or data for everything that isnt a plugin - this means the `dags` directory is ahead of the `plugins` directory in `sys.path`. Do you have `dags/operators` directory with an `__init__.py` inside? This will cause the import error which you are seeing. – Daniel T Jun 12 '22 at 20:03
  • The specific reason I want to store operators under `plugins/operators`, hooks under `operators/hook`, ect, ect is b/c I found many other forums online reference this article that recommends this directory stucture: https://www.astronomer.io/guides/airflow-importing-custom-hooks-operators/. But yes, operators under `dags/operators` do work for me but ideally I make it work `plugins/operators`. do you have any resources on how to add `plugins` directory in `sys.path?` I assume this is something that needs to be done in the airflow YAML @daniel – jorgeavelar98 Jun 12 '22 at 22:07
  • @jorgeavelar98 that's fair enough. It doesn't really matter where you store them from Airflow's point of view. Can you check that the directory `dags/operators` does not exist? If it does, does it have an `__init__.py` inside it? I tried exactly what you have and I get your error if I have `dags/operators/__init__.py` and try importing from `operators`. Your plugins directory should already be on the path. Run the command `airflow info` and look at the `python_path` to see. – Daniel T Jun 13 '22 at 01:10
  • So I checked and the directory `dags/operators` does not exist: https://www.linkpicture.com/q/screenshot_10.png This is what what `python_path` looks like for me: https://www.linkpicture.com/q/python-path.png The `airflow/plugins` directory is in the path already so im not sure whats the issue – jorgeavelar98 Jun 13 '22 at 06:14
0

you should folder "plugins" into folder "dags"