Previous Issues
This issue has been reported before here, here, and here, however, I suspect that this may be because of a call to google cloud storage.
Premise/Problem
The following code is placed in the DAG folder for the Google Cloud Composer instance.
The following block of code "dynamically" generates DAGs based on a list of strings, and will work successfully, generating 2 DAGs with the names: "new_dummy_ohhel
" and "new_dummy_hello
". These DAGs are accessible and will work.
import datetime as dt
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from google.cloud.storage import Client as Client_Storage
list_strings = ["ohhello", "hellothere"]
# ^ Variable of importance
for string_val in list_strings:
name_dag = f"new_dummy_{string_val[:5]}"
# Just get the first 5 characters (this is important later)
dag = DAG(
name_dag,
default_args={
"owner": "airflow",
"start_date": dt.datetime.now() - dt.timedelta(days=1),
# ^ yesterday
"depends_on_past": False,
"email": [""],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": dt.timedelta(minutes=5),
},
schedule_interval=None,
)
with dag:
op_dummy_start = DummyOperator(task_id="new-dummy-task-start")
op_dummy_end = DummyOperator(task_id="new-dummy-task-end")
op_dummy_start >> op_dummy_end
globals()[name_dag] = dag
Now, what is problematic and strange is that when a simple call to Google Cloud Storage replaces the list of strings, the DAGs are still created, but generate an error when trying to access them. If the variable list_strings
is replaced with the following, this error occurs.
list_strings = [
x.name
for x in Client_Storage().bucket("some_bucket_name").list_blobs()
]
Assuming that I have files "something.json" and "omgwhy.json" in the bucket "some_bucket_name", then two inaccessible & non-executable DAGs will be created: "new_dummy_somet
" and "new_dummy_omgwh
". (The first five (5) letters were obtained only so that a .
was not included.) This indicates that the call to storage was successful, and yet, the DAG still "seems to be missing".
Even if the code immediately overwrote that list like the following, the error that DAG "seems to be missing" will still appear (note that the DAGs will be "new_dummy_ohhel
" and "new_dummy_hello
"):
list_strings = [
x.name
for x in Client_Storage().bucket("some_bucket_name").list_blobs()
]
list_strings = ["ohhello", "hellothere"]
tl;dr & Hypothesis
When any call (including a successful & unused one) is made to Google Cloud Storage, all DAGs created in the file will appear successfully, but will indicate that DAG "<whatever dag name here>" seems to be missing.
I'm dumbfounded as to why a successful call causes this problem.
Solutions Tried
- Tried restarting the entire Google Cloud Composer instance
- Tried restarting the Airflow webserver by adding environment variables