2

I am trying to pass metadata between python function components by attaching it an output artifact in a vertex ai kubeflow pipeline, from the documentation this seems straightforwards, but try as I might I can't get it to work. I am trying to attach a string to a Output[Dataset] artifact in one component, and then use it in the following component. An example:

This pipeline has two components, one to create a dataset and attach some metadata to it, another to receive the dataset artifact and then access the metadata.

I have tried with and without writing the data to a file.

from kfp.dsl import pipeline, component
from kfp.dsl import Input, Output, Dataset, Metrics, Model
from kfp import compiler, dsl

@component(packages_to_install=["pandas"], base_image='python:3.9')
def make_metadata(
  data: Output[Dataset],
):
    import pandas as pd
    param_out_df = pd.DataFrame({"dummy_col": "dummy_row"}, index=[0])
    param_out_df.to_csv(data.path, index=False)
    
    data.metadata["data_num"] = 1
    data.metadata["data_str"] = "random string"    
  
@component(packages_to_install=["pandas"], base_image='python:3.9')
def use_metadata(
    data: Input[Dataset],
):
    print("data - metadata")
    print(data.metadata)
    
@dsl.pipeline(
   name='test-pipeline',
   description='An example pipeline that performs arithmetic calculations.', 
   pipeline_root=f'{BUCKET}/pipelines'
)
def metadata_pipeline():
    metadata_made = make_metadata()
    
    used_metadata = use_metadata(data=metadata_made.outputs["data"])
    
PIPELINE_NAME = "test-pipeline"    
PIPELINE_FILENAME = f"{PIPELINE_NAME}.yaml"

compiler.Compiler().compile(
  pipeline_func=metadata_pipeline, 
  package_path=PIPELINE_FILENAME

This code runs the pipeline yaml file created above in vertex

import datetime as datetime
from google.cloud import aiplatform

current_time = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
test_run_name = f"{PIPELINE_NAME}_{current_time}"

aiplatform.init(project=PROJECT_ID, location=LOCATION, )
job = aiplatform.pipeline_jobs.PipelineJob(
    display_name=test_run_name,
    template_path=PIPELINE_FILENAME
)
job.run(sync=False)

kfp packages installed are as follows

kfp==2.0.0b13
kfp-pipeline-spec==0.2.0
kfp-server-api==2.0.0a6

Not only can I not see it in the print statement, whatever I try will not get it show in the vertex ai metadata lineage area either (replaces sensitive with "xxx"

{
  "name": "xxx",
  "displayName": "data",
  "instanceSchemaTitle": "system.Dataset",
  "uri": "xxx",
  "etag": "xxx",
  "createTime": "2023-03-17T10:52:10.040Z",
  "updateTime": "2023-03-17T10:53:01.621Z",
  "state": "LIVE",
  "schemaTitle": "system.Dataset",
  "schemaVersion": "0.0.1",
  "metadata": {}
}

Any help would be much appreciated, I realise I can pass the data in other ways such as OutputPath but conceptually attaching it to items is preferred as the metadata is relating to that item.

I have followed this guide to the letter, it also doesn't work:

Vertex AI Pipelines: Lightweight Python function-based components, and component I/O

Like above, I can't see the metadata attached in the preprocess component when I look at lineage or try and access it in the next component:

output_dataset_one.metadata["hello"] = "there"

Gavin
  • 71
  • 1
  • 5

1 Answers1

0

I recently did similar things using NamedTuple, e.g. the component I created to export a BigQuery table into GCS, stored the list of exported files and their URIs in a list.

Is this what you're looking for?

@component(
    packages_to_install=[
        "google-cloud-bigquery",
        "google-cloud-storage",
        "google-auth",
        "google_cloud_pipeline_components",
    ],
    base_image="python:3.9",
)
def _export_bq_data_to_gcs(
    project_id: str,
    dataset_id: str,
    table_id: str,
    location: str,
    staging_bucket: str,
    pipeline_name: str,
    folder_name: str,
) -> NamedTuple("Outputs", destination_folder=str, data_files=List[str]):
    from google.cloud import bigquery, storage

    Outputs = NamedTuple("Outputs", destination_folder=str, data_files=str)

    print(f"project_id: {project_id}")

    bq_client = bigquery.Client(project=project_id)
    destination_folder = f"{staging_bucket}/{pipeline_name}/{folder_name}"
    destination_uri = f"{destination_folder}/data*.csv"

    dataset_ref = bigquery.DatasetReference(project_id, dataset_id)
    table_ref = dataset_ref.table(table_id)

    extract_job = bq_client.extract_table(table_ref, destination_uri, location=location)
    extract_job.result()

    print(
        f"Successfully Exported {project_id}:{dataset_id}.{table_id} to {destination_uri}"
    )

    storage_client = storage.Client(project=project_id)
    blobs = storage_client.list_blobs(
        staging_bucket.replace("gs://", ""), prefix=f"{pipeline_name}/{folder_name}"
    )
    data_files = [f"{staging_bucket}/{blob.name}" for blob in blobs]

    print(f"Dataset files exported: {data_files}")

    return Outputs(destination_folder=destination_folder, data_files=data_files)
Shengy90
  • 43
  • 6