2

Previous Issues

This issue has been reported before here, here, and here, however, I suspect that this may be because of a call to google cloud storage.

Premise/Problem

The following code is placed in the DAG folder for the Google Cloud Composer instance.

The following block of code "dynamically" generates DAGs based on a list of strings, and will work successfully, generating 2 DAGs with the names: "new_dummy_ohhel" and "new_dummy_hello". These DAGs are accessible and will work.

import datetime as dt

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator

from google.cloud.storage import Client as Client_Storage


list_strings = ["ohhello", "hellothere"]
# ^ Variable of importance

for string_val in list_strings:

    name_dag = f"new_dummy_{string_val[:5]}"
    # Just get the first 5 characters (this is important later)

    dag = DAG(
        name_dag,
        default_args={
            "owner": "airflow",
            "start_date": dt.datetime.now() - dt.timedelta(days=1),
            # ^ yesterday
            "depends_on_past": False,
            "email": [""],
            "email_on_failure": False,
            "email_on_retry": False,
            "retries": 1,
            "retry_delay": dt.timedelta(minutes=5),
        },
        schedule_interval=None,
    )

    with dag:
        op_dummy_start = DummyOperator(task_id="new-dummy-task-start")
        op_dummy_end = DummyOperator(task_id="new-dummy-task-end")

        op_dummy_start >> op_dummy_end

    globals()[name_dag] = dag

Now, what is problematic and strange is that when a simple call to Google Cloud Storage replaces the list of strings, the DAGs are still created, but generate an error when trying to access them. If the variable list_strings is replaced with the following, this error occurs.

list_strings = [
    x.name
    for x in Client_Storage().bucket("some_bucket_name").list_blobs()
]

Assuming that I have files "something.json" and "omgwhy.json" in the bucket "some_bucket_name", then two inaccessible & non-executable DAGs will be created: "new_dummy_somet" and "new_dummy_omgwh". (The first five (5) letters were obtained only so that a . was not included.) This indicates that the call to storage was successful, and yet, the DAG still "seems to be missing".

Even if the code immediately overwrote that list like the following, the error that DAG "seems to be missing" will still appear (note that the DAGs will be "new_dummy_ohhel" and "new_dummy_hello"):

list_strings = [
    x.name
    for x in Client_Storage().bucket("some_bucket_name").list_blobs()
]
list_strings = ["ohhello", "hellothere"]

tl;dr & Hypothesis

When any call (including a successful & unused one) is made to Google Cloud Storage, all DAGs created in the file will appear successfully, but will indicate that DAG "<whatever dag name here>" seems to be missing. I'm dumbfounded as to why a successful call causes this problem.

Solutions Tried

  • Tried restarting the entire Google Cloud Composer instance
  • Tried restarting the Airflow webserver by adding environment variables
Clayton J Roberts
  • 347
  • 1
  • 5
  • 19

1 Answers1

1

The managed web server in Cloud Composer operates under a difference service account than your environment's main worker machines (which use the service account specified during environment creation). This means if you intend to read from buckets other than your environment's main bucket, you will need to use similar ACLs, or the web server will not be able to read from the bucket.

In your case, it is possible that the Airflow scheduler can read the bucket (using the environment's service account), but the web server cannot. The scheduler will create an entry for the DAG, but if the web server cannot parse the definition file without encountering an exception, you would receive "DAG x is missing".

If the above sounds right to you, you can fix this by adjusting the Cloud Storage bucket ACL, or enabling DAG serialization. Serialization removes the web server's need to parse/execute definition files and leaves it all to the scheduler, so it could also fix your issue.

hexacyanide
  • 88,222
  • 31
  • 159
  • 162
  • For someone quickly looking for the solution, setting these in your Airflow config overrides fixed my issue: https://cloud.google.com/composer/docs/dag-serialization#enable – Clayton J Roberts Mar 22 '21 at 14:18