1

* TLDR: This question originally based on problem that was later determined to be due to the updated title of this question. Skip to "Update 2" for most relevant question details.

Have dag file that imports a python list of dicts from another python file in another location and creates a dag based on the list's dict values and airflow is having weird problem where it appear to see something different that when I run the dag file manually. Some snippet like...

...
environ["PROJECT_HOME"] = "/path/to/some/project/files"
# import certain project files
sys.path.append(environ["PROJECT_HOME"])
import tables as tt

tables = tt.tables

for table in tables:
    print table
    assert isinstance(table, dict)
    <create some dag task 1>
    <create some dag task 2>
    ...

When running the py file manually from the ~/airflow/dag/ dir, there are no errors thrown and the for loop prints the dicts, but airflow apparently sees things differently in the webserver and when running airflow list_dags. Running airflow list_dags I get the error

    assert isinstance(table, dict)
AssertionError

and don't know how to test what is causing this, since again when running the py file manually from the dag location, there is no problem and the print statement shows dicts and the webserver UI shows no further error message.

Anyone know what could be going on here? Maybe something about how the imports are working?

* Update 1:

Seeing more weirdness in that when calling functions from the imported python module, everything runs fine when running the dag file manually, but airflow list_dags says...

AttributeError: 'module' object has no attribute 'my_func'

making me even further suspect some import weirdness, even though this is the exact same process I am using in another dag file (ie. setting some environ values and appending to sys.path) to import modules for that dag and have no problems there.

* Update 2:

The problem appears to be (after printing various sys.path, environ, and module.__all__ info at the erroring assert) that a similarly-named module that is getting imported is from the another project I did this same exact procedure for. Ie. have another file that does...

...
environ["PROJECT_HOME"] = "/path/to/some/project/files"
# import certain project files
sys.path.append(environ["PROJECT_HOME"])
import tables as tt

tables = tt.tables

for table in tables:
    print table
    assert isinstance(table, dict)
    <create some dag task 1>
    <create some dag task 2>
    ...

and this project home is getting used instead to download a similarly named module that also has a obj named what I was expecting (even when I insert the projects folder at front of sys.path). Other than making packaged dags is there a way to keep airflow from combining all of the environ and sys.path values of different dags (since I use $PROJECT_HOME in various bash and python task scripts)?

lampShadesDrifter
  • 3,925
  • 8
  • 40
  • 102
  • My current workaround is to have the various bash and python files paramaterized by a project_home value that gets set and passed from the dag file as a way to propagate the project home abs path to all files from a centralized location. For module logic that happens *before* dag tasks creation, I moved that code into the airflow file (makes the file longer than I'd like, but don't see another way on this front). – lampShadesDrifter Oct 17 '19 at 01:52
  • As of this writing, from discussions on the airflow mailing list, it appears that there is currently so such way to have different dags use different sys.path and environ values. Unless anyone has any better ideas, the workaround above will have to suffice for now. – lampShadesDrifter Oct 18 '19 at 02:09
  • http://mail-archives.apache.org/mod_mbox/airflow-users/201910.mbox/%3CCAKqkst9C7v-_i6RX+pEGEL=we6rqqug=3NE5MQZNDnGzjM9faA@mail.gmail.com%3E – lampShadesDrifter Nov 01 '19 at 01:46
  • Found this to be very helpful as well: https://stackoverflow.com/q/67631/8236733 – lampShadesDrifter Dec 04 '19 at 23:44

1 Answers1

0

For bringing in specific modules from other paths, I use the solution here to import other python modules by specifying their absolute file path.

For running various python scripts as airflow tasks using different python interpreters, I do something like...

do_stuff_a = BashOperator(
        task_id='my_task_a',
        bash_command='/path/to/virtualenv_a/bin/python /path/to/script_a.py'),
        execution_timeout=timedelta(minutes=30),
        dag=dag)

as done in similar question here

lampShadesDrifter
  • 3,925
  • 8
  • 40
  • 102