I must generate some dag. I've saved the json table schema files on GCP bucket. The files on the GCP bucket associates to composer will be remapped on /home/airflow/gcs/dags/. If i define the method for read the json file, after the creation of the dag, all goes fine. But if I wish generate some "common code" (for put it on a library of mine), I can't access to FileSystem using the code in the library, in the specific I can't use the python json library.
The strange thing is that, I define the method out of the dag creation step, but I invoke it only after the dag creation!
To complete the discussion, i haven't problems if the code in the library uses only in memory objects.
I've this issue when i work with airflow (1.9 on GCP driver by composer)
This is my external library:
lib/
__init__.py
bb_airflow_utils.py
on external library
def load_json_file(fname):
#per far sì che il dag la veda
with open(fname, 'r') as f:
d = json.load(f)
return d
on principal script
from lib.bb_airflow_utils import *
ROOT_PATH = 'home/airflow/gcs/dags'
IDCLI = 'goofy'
...
...
with DAG(dag_id=dag_name, default_args=dag_args) as dag:
filepath = path.join(ROOT_PATH, '{}-todwh.json'.format(IDCLI))
get_data = load_json_file(filepath)
.....
task_dummy_start = DummyOperator(task_id='task_{}_start'.format(dag_name), dag=dag)
.....
Airflow ignore the operator and by UI said that the dag has not SLA