I am trying to read all the folders in a google cloud storage bucket. I have a GCS bucket/root folder path. I am trying to read all subfolders in a root folder and perform a task for each folder.
from google.cloud import storage
def get_folders():
BUCKET = 'bucket-name'
PROJECT = 'project-name'
path = 'root-path'
client = storage.Client()
bucket = client.bucket(BUCKET, PROJECT)
folder_blobs = (client.list_blobs(BUCKET, prefix=path, delimiter='.'))
folder_names = []
for blob in folder_blobs:
folder_names.append(blob.name.split('/')[-2])
folder_names.sort()
return folder_names
And it does get the folders, however airflow UI wont display the graph. If I try to click on a dag name, it would say dag_name is not currently available.
I can see completed tasks and the results are correct, but I cannot get the DAG.
I am at a loss and don't what else to do. I know that it fails when I try to use a folder_blobs object.
Please, if anyone can help please.
FYI, it all works in stand alone Python
DAG file would be like that:
from google.cloud import storage
from datetime import datetime #, timedelta
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import Variable
def get_folders():
BUCKET = 'MyBucket'
PROJECT = 'MyProject'
path = 'root_path'
client = storage.Client()
bucket = client.bucket(BUCKET, PROJECT)
blobs = (client.list_blobs(BUCKET, prefix=path, delimiter='.'))1
folder_names = []
for blob in blobs:
folder_names.append(blob.name.split('/')[-2])
folder_names.sort()
return folder_names
args = {
'owner': 'airflow',
}
PARENT_DAG_NAME = 'Get_Folders_v1.02'
dag = DAG(
dag_id=PARENT_DAG_NAME,
default_args=args,
schedule_interval='@daily',
catchup=False,
start_date=datetime(2021, 1, 1),
)
start = DummyOperator(task_id='Start', dag=dag)
end = DummyOperator(task_id='End', dag=dag)
for folder in folders:
t1 = DummyOperator(
task_id='Folder_{}_task'.format(folder),
dag=dag
)
start >> t1 >> end