4

This is my first time using Google's Vertex AI Pipelines. I checked this codelab as well as this post and this post, on top of some links derived from the official documentation. I decided to put all that knowledge to work, in some toy example: I was planning to build a pipeline consisting of 2 components: "get-data" (which reads some .csv file stored in Cloud Storage) and "report-data" (which basically returns the shape of the .csv data read in the previous component). Furthermore, I was cautious to include some suggestions provided in this forum. The code I currently have, goes as follows:


from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, component, Dataset, Input, Output
from google.cloud import aiplatform

# Components section   

@component(
    packages_to_install=[
        "google-cloud-storage",
        "pandas",
    ],
    base_image="python:3.9",
    output_component_file="get_data.yaml"
)
def get_data(
    bucket: str,
    url: str,
    dataset: Output[Dataset],
):
    import pandas as pd
    from google.cloud import storage
    
    storage_client = storage.Client("my-project")
    bucket = storage_client.get_bucket(bucket)
    blob = bucket.blob(url)
    blob.download_to_filename('localdf.csv')
    
    # path = "gs://my-bucket/program_grouping_data.zip"
    df = pd.read_csv('localdf.csv', compression='zip')
    df['new_skills'] = df['new_skills'].apply(ast.literal_eval)
    df.to_csv(dataset.path + ".csv" , index=False, encoding='utf-8-sig')


@component(
    packages_to_install=["pandas"],
    base_image="python:3.9",
    output_component_file="report_data.yaml"
)
def report_data(
    inputd: Input[Dataset],
):
    import pandas as pd
    df = pd.read_csv(inputd.path)
    return df.shape


# Pipeline section

@pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline.
    name="my-pipeline",
)
def my_pipeline(
    url: str = "test_vertex/pipeline_root/program_grouping_data.zip",
    bucket: str = "my-bucket"
):
    dataset_task = get_data(bucket, url)

    dimensions = report_data(
        dataset_task.output
    )

# Compilation section

compiler.Compiler().compile(
    pipeline_func=my_pipeline, package_path="pipeline_job.json"
)

# Running and submitting job

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

run1 = aiplatform.PipelineJob(
    display_name="my-pipeline",
    template_path="pipeline_job.json",
    job_id="mlmd-pipeline-small-{0}".format(TIMESTAMP),
    parameter_values={"url": "test_vertex/pipeline_root/program_grouping_data.zip", "bucket": "my-bucket"},
    enable_caching=True,
)

run1.submit()

I was happy to see that the pipeline compiled with no errors, and managed to submit the job. However "my happiness lasted short", as when I went to Vertex AI Pipelines, I stumbled upon some "error", which goes like:

The DAG failed because some tasks failed. The failed tasks are: [get-data].; Job (project_id = my-project, job_id = 4290278978419163136) is failed due to the above error.; Failed to handle the job: {project_number = xxxxxxxx, job_id = 4290278978419163136}

I did not find any related info on the web, neither could I find any log or something similar, and I feel a bit overwhelmed that the solution to this (seemingly) easy example, is still eluding me.

Quite obviously, I don't what or where I am mistaking. Any suggestion?

Ricco D
  • 6,873
  • 1
  • 8
  • 18
David Espinosa
  • 760
  • 7
  • 21
  • If you got this error it means that your pipeline failed at `get_data()`. You can view the logs if you open GCP Console > Vertex AI > Pipelines, then select the pipeline that failed . Click "get-data" step and click "View Logs". You should be able to see the runtime logs. When you find the logs, can you post the runtime logs on your question as well? – Ricco D Apr 22 '22 at 04:02
  • Can you also include your imports for the pipeline? – Ricco D Apr 22 '22 at 04:19
  • Hello @RiccoD. I have updated the imports requested. Thanks for the hint in regards of the location of the "Logs", it was extremely helpuful in troubleshooting my pipeline (I managed to make it work, I will add my answer afterwards). I had to have the "Logs Viewer" (roles/logging.viewer) role enabled by the Project Owner however, because at first when trying to read the Logs, I was received by the "Error: the caller does not have the permission" message, and after some research, I got to discover that the aforementioned role, could be of help. – David Espinosa Apr 22 '22 at 18:11
  • @RiccoD, one last question: During the compilation of the pipeline however, I got the warning: "/home/jupyter/.local/lib/python3.7/site-packages/kfp/v2/compiler/compiler.py:1266: FutureWarning: APIs imported from the v1 namespace (e.g. kfp.dsl, kfp.components, etc) will not be supported by the v2 compiler since v2.0.0 category=FutureWarning". What would be the recommendation here? Thank you. – David Espinosa Apr 22 '22 at 18:13
  • I did not yet encounter that issue. It might be better to post a new question regarding the warning you encountered so the community can share their thoughts. – Ricco D Apr 25 '22 at 00:05

2 Answers2

4

With some suggestions provided in the comments, I think I managed to make my demo pipeline work. I will first include the updated code:

from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, component, Dataset, Input, Output
from datetime import datetime
from google.cloud import aiplatform
from typing import NamedTuple


# Importing 'COMPONENTS' of the 'PIPELINE'

@component(
    packages_to_install=[
        "google-cloud-storage",
        "pandas",
    ],
    base_image="python:3.9",
    output_component_file="get_data.yaml"
)
def get_data(
    bucket: str,
    url: str,
    dataset: Output[Dataset],
):
    """Reads a csv file, from some location in Cloud Storage"""
    import ast
    import pandas as pd
    from google.cloud import storage
    
    # 'Pulling' demo .csv data from a know location in GCS
    storage_client = storage.Client("my-project")
    bucket = storage_client.get_bucket(bucket)
    blob = bucket.blob(url)
    blob.download_to_filename('localdf.csv')
    
    # Reading the pulled demo .csv data
    df = pd.read_csv('localdf.csv', compression='zip')
    df['new_skills'] = df['new_skills'].apply(ast.literal_eval)
    df.to_csv(dataset.path + ".csv" , index=False, encoding='utf-8-sig')


@component(
    packages_to_install=["pandas"],
    base_image="python:3.9",
    output_component_file="report_data.yaml"
)
def report_data(
    inputd: Input[Dataset],
) -> NamedTuple("output", [("rows", int), ("columns", int)]):
    """From a passed csv file existing in Cloud Storage, returns its dimensions"""
    import pandas as pd
    
    df = pd.read_csv(inputd.path+".csv")
    
    return df.shape


# Building the 'PIPELINE'

@pipeline(
    # i.e. in my case: PIPELINE_ROOT = 'gs://my-bucket/test_vertex/pipeline_root/'
    # Can be overriden when submitting the pipeline
    pipeline_root=PIPELINE_ROOT,
    name="readcsv-pipeline",  # Your own naming for the pipeline.
)
def my_pipeline(
    url: str = "test_vertex/pipeline_root/program_grouping_data.zip",
    bucket: str = "my-bucket"
):
    dataset_task = get_data(bucket, url)

    dimensions = report_data(
        dataset_task.output
    )
    

# Compiling the 'PIPELINE'    

compiler.Compiler().compile(
    pipeline_func=my_pipeline, package_path="pipeline_job.json"
)


# Running the 'PIPELINE'

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

run1 = aiplatform.PipelineJob(
    display_name="my-pipeline",
    template_path="pipeline_job.json",
    job_id="mlmd-pipeline-small-{0}".format(TIMESTAMP),
    parameter_values={
        "url": "test_vertex/pipeline_root/program_grouping_data.zip",
        "bucket": "my-bucket"
    },
    enable_caching=True,
)

# Submitting the 'PIPELINE'

run1.submit()

Now, I will add some complementary comments, which in sum, managed to solve my problem:

  • First, having the "Logs Viewer" (roles/logging.viewer) enabled for your user, will greatly help to troubleshoot any existing error in your pipeline (Note: that role worked for me, however you might want to look for a better matching role for you own purposes here). Those errors will appear as "Logs", which can be accessed by clicking the corresponding button:

enter image description here

  • NOTE: In the picture above, when the "Logs" are displayed, it might be helpful to carefully check each log (close to the time when you created you pipeline), as generally each eof them corresponds with a single warning or error line:

Verte AI Pipelines Logs

  • Second, the output of my pipeline was a tuple. In my original approach, I just returned the plain tuple, but it is advised to return a NamedTuple instead. In general, if you need to input / output one or more "small values" (int or str, for any reason), pick a NamedTuple to do so.
  • Third, when the connection between your pipelines is Input[Dataset] or Ouput[Dataset], adding the file extension is needed (and quite easy to forget). Take for instance the ouput of the get_data component, and notice how the data is recorded by specifically adding the file extension, i.e. dataset.path + ".csv".

Of course, this is a very tiny example, and projects can easily scale to huge projects, however as some sort of "Hello Vertex AI Pipelines" it will work well.

Thank you.

Ricco D
  • 6,873
  • 1
  • 8
  • 18
David Espinosa
  • 760
  • 7
  • 21
  • You can read directly from GC. pd.read_csv('gs://bucket/your_path.csv'). https://stackoverflow.com/a/50201179/1117305 – AJ AJ Jan 09 '23 at 07:41
1

Thanks for your writeup. Very helpful! I had the same error, but it turned out to be for a different reasons, so noting it here... In my pipeline definition step I have the following parameters...
'''

def my_pipeline(bq_source_project: str = BQ_SOURCE_PROJECT,  
                    bq_source_dataset: str = BQ_SOURCE_DATASET,  
                    bq_source_table: str = BQ_SOURCE_TABLE,  
                    output_data_path: str = "crime_data.csv"):

'''

My error was when I run the pipeline, I did not have these same parameters entered. Below is the fixed version... '''

job = pipeline_jobs.PipelineJob(  
project=PROJECT_ID,  
      location=LOCATION,  
      display_name=PIPELINE_NAME,  
      job_id=JOB_ID,  
      template_path=FILENAME,  
      pipeline_root=PIPELINE_ROOT,  
      parameter_values={'bq_source_project': BQ_SOURCE_PROJECT,  
                          'bq_source_dataset': BQ_SOURCE_DATASET,  
                          'bq_source_table': BQ_SOURCE_TABLE}  

'''

Ken
  • 11
  • 1