0

I am trying to read all the folders in a google cloud storage bucket. I have a GCS bucket/root folder path. I am trying to read all subfolders in a root folder and perform a task for each folder.

from google.cloud import storage

def get_folders():
    BUCKET = 'bucket-name'
    PROJECT = 'project-name'
    path = 'root-path'
    client = storage.Client()
    bucket = client.bucket(BUCKET, PROJECT)
    folder_blobs = (client.list_blobs(BUCKET, prefix=path, delimiter='.'))
    folder_names = []
    for blob in folder_blobs:
        folder_names.append(blob.name.split('/')[-2])
    folder_names.sort()
    return folder_names

And it does get the folders, however airflow UI wont display the graph. If I try to click on a dag name, it would say dag_name is not currently available.

I can see completed tasks and the results are correct, but I cannot get the DAG.

I am at a loss and don't what else to do. I know that it fails when I try to use a folder_blobs object.

Please, if anyone can help please.

FYI, it all works in stand alone Python

DAG file would be like that:

from google.cloud import storage

from datetime import datetime #, timedelta
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator

from airflow.models import Variable

def get_folders():
    BUCKET = 'MyBucket'
    PROJECT = 'MyProject'
    path = 'root_path'
    client = storage.Client()
    bucket = client.bucket(BUCKET, PROJECT)
    blobs = (client.list_blobs(BUCKET, prefix=path, delimiter='.'))1
    folder_names = []
    for blob in blobs:
        folder_names.append(blob.name.split('/')[-2])
    folder_names.sort()
    return folder_names

args = {
    'owner': 'airflow',
}

PARENT_DAG_NAME = 'Get_Folders_v1.02'

dag = DAG(
    dag_id=PARENT_DAG_NAME,
    default_args=args,
    schedule_interval='@daily', 
    catchup=False,
    start_date=datetime(2021, 1, 1),
)

start = DummyOperator(task_id='Start', dag=dag)
end = DummyOperator(task_id='End', dag=dag)
for folder in folders:
    t1 = DummyOperator(
        task_id='Folder_{}_task'.format(folder),
        dag=dag
    )

start >> t1 >> end

1 Answers1

0

I tried to use your code in my setup and I changed it since it errors out on a few lines like for folder in folders: (folders object is not declared). Also you have an extra '1' at the end of the line blobs = (client.list_blobs(BUCKET, prefix=path, delimiter='.'))1

I removed the loop and changed DummyOperator to PythonOperator to use your get_folder() function to return the folder_names. By the way I ran Airflow using Cloud Composer. I followed this quick start to set up the Cloud Composer environment.

from google.cloud import storage
from datetime import datetime #, timedelta
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import PythonOperator

from airflow.models import Variable

def get_folders():
    BUCKET = 'my-test-bucket'
    PROJECT = 'my-project'
    path = '25868628' #just a random folder in my bucket
    client = storage.Client()
    bucket = client.bucket(BUCKET, PROJECT)
    blobs = (client.list_blobs(BUCKET, prefix=path, delimiter='.'))
    folder_names = []
    for blob in blobs:
        folder_names.append(blob.name.split('/')[-2])
    folder_names.sort()
    return folder_names

args = {
    'owner': 'airflow',
}

PARENT_DAG_NAME = 'Get_Folders_v1.02'

dag = DAG(
    dag_id=PARENT_DAG_NAME,
    default_args=args,
    schedule_interval='@daily',
    catchup=False,
    start_date=datetime(2021, 1, 1),
)

start = DummyOperator(task_id='Start', dag=dag)
end = DummyOperator(task_id='End', dag=dag)
t1 = PythonOperator(
        task_id='get_folders',
        python_callable=get_folders,
        dag=dag,
        )

start >> t1 >> end

DAG file in airflow:

enter image description here

Graph View: enter image description here

Tree view: enter image description here

Output logs:

[2021-01-07 03:40:06,448] {base_task_runner.py:113} INFO - Job 19: Subtask get_folders Running <TaskInstance: Get_Folders_v1.02.get_folders 2021-01-06T00:00:00+00:00 [running]> on host airflow-worker-c985959c8-6fv56
[2021-01-07 03:40:07,164] {base_task_runner.py:113} INFO - Job 19: Subtask get_folders [2021-01-07 03:40:07,163] {python_operator.py:114} INFO - Done. Returned value was: ['25868628', 'test_images']
[2021-01-07 03:40:07,267] {taskinstance.py:1066} INFO - Marking task as SUCCESS.dag_id=Get_Folders_v1.02, task_id=get_folders, execution_date=20210106T000000, start_date=20210107T033959, end_date=20210107T034007
[2021-01-07 03:40:07,267] {base_task_runner.py:113} INFO - Job 19: Subtask get_folders [2021-01-07 03:40:07,267] {taskinstance.py:1066} INFO - Marking task as SUCCESS.dag_id=Get_Folders_v1.02, task_id=get_folders, execution_date=20210106T000000, start_date=20210107T033959, end_date=20210107T034007
Ricco D
  • 6,873
  • 1
  • 8
  • 18
  • Thank you, I realized. In my own example for testing purposes I had a call to the function outside of a Operator. – tadas marcinkevicius Jan 07 '21 at 16:25
  • I was trying to use that folders variable to create tasks. From your example how can I use those folders to create a sequence of tasks for each folder? – tadas marcinkevicius Jan 07 '21 at 16:32
  • @tadasmarcinkevicius It seems that your follow up question is no longer related to your original question. It is best to create a separate post about it, though I found this [SO post](https://stackoverflow.com/questions/41517798/proper-way-to-create-dynamic-workflows-in-airflow) that might give you an idea on how to implement the solution. – Ricco D Jan 08 '21 at 05:08
  • As I stated in my original question, I am trying to create a separate task for EACH found folder. Mr. Ricco your answer just reads the folders. I am aware that my Python code is correct. Thank you for the link. It does help. – tadas marcinkevicius Jan 08 '21 at 16:51