2

I must generate some dag. I've saved the json table schema files on GCP bucket. The files on the GCP bucket associates to composer will be remapped on /home/airflow/gcs/dags/. If i define the method for read the json file, after the creation of the dag, all goes fine. But if I wish generate some "common code" (for put it on a library of mine), I can't access to FileSystem using the code in the library, in the specific I can't use the python json library.

The strange thing is that, I define the method out of the dag creation step, but I invoke it only after the dag creation!

To complete the discussion, i haven't problems if the code in the library uses only in memory objects.

I've this issue when i work with airflow (1.9 on GCP driver by composer)

This is my external library:

lib/
    __init__.py
    bb_airflow_utils.py

on external library

def load_json_file(fname):
    #per far sì che il dag la veda
    with open(fname, 'r') as f:
        d = json.load(f)
    return d

on principal script


from lib.bb_airflow_utils import *
ROOT_PATH = 'home/airflow/gcs/dags'
IDCLI = 'goofy'
...
...
with DAG(dag_id=dag_name, default_args=dag_args) as dag:
    filepath = path.join(ROOT_PATH, '{}-todwh.json'.format(IDCLI))
    get_data = load_json_file(filepath)
    .....
    task_dummy_start = DummyOperator(task_id='task_{}_start'.format(dag_name), dag=dag)
    .....

Airflow ignore the operator and by UI said that the dag has not SLA

Stefano G.
  • 143
  • 2
  • 14

1 Answers1

4

Have a look at https://cloud.google.com/composer/docs/how-to/using/installing-python-dependencies#install-local.

You can put common code in a separate file and put it in separate folder like the example below.

Place the dependencies within a subdirectory in the dags/ folder. To import a module from a subdirectory, each subdirectory in the module's path must contain a __init__.py package marker file.

In this example, the dependency is coin_module.py:

dags/
  use_local_deps.py  # A DAG file.
  dependencies/
    __init__.py
    coin_module.py

Import the dependency from the DAG definition file.

For example:

from dependencies import coin_module
kaxil
  • 17,706
  • 2
  • 59
  • 78
  • 1
    Hi @kaxil after your response I've added more details at the question. But by change, is necessary that the name of the directory is ***dependencies***? – Stefano G. Apr 03 '19 at 10:51
  • 1
    No, the name of the directory can be anything – kaxil Apr 04 '19 at 10:27
  • And Change your ROOT_PATH to ROOT_PATH = '/home/airflow/gcs/dags' – kaxil Apr 04 '19 at 10:28
  • Thanks Kaxil, this was the problem! Now the code works, i did not find any error but only that the dag was with no SLAS. Thanks again – Stefano G. Apr 05 '19 at 15:11