8

I'm doing some experimentation with Kubeflow Pipelines and I'm interested in retrieving the run id to save along with some metadata about the pipeline execution. Is there any way I can do so from a component like a ContainerOp?

Ark-kun
  • 6,358
  • 2
  • 34
  • 70
DSF
  • 83
  • 1
  • 3

6 Answers6

11

You can use kfp.dsl.EXECUTION_ID_PLACEHOLDER and kfp.dsl.RUN_ID_PLACEHOLDER as arguments for your component. At runtime they will be replaced with the actual values.

Ark-kun
  • 6,358
  • 2
  • 34
  • 70
  • 2
    Thanks. How are these two different? – Ben Nov 26 '20 at 19:08
  • 3
    RUN_ID is for the whole pipeline run while the EXECUTION_ID is for a single component task execution. It might be better to just use a random number generation component though. – Ark-kun Nov 28 '20 at 10:04
4

I tried to do this using the Python's DSL but seems that isn't possible right now.

The only option that I found is to use the method that they used in this sample code. You basically declare a string containing {{workflow.uid}}. It will be replaced with the actual value during execution time.

You can also do this in order to get the pod name, it would be {{pod.name}}.

Gabriel Bessa
  • 468
  • 1
  • 4
  • 8
2

Since kubeflow pipeline relies on argo, you can use argo variable to get what you want.

For example,

@func_to_container_op
def dummy(run_id, run_name) -> str:
    return run_id, run_name

@dsl.pipeline(
    name='test_pipeline',
)
def test_pipeline():
  dummy('{{workflow.labels.pipeline/runid}}', '{{workflow.annotations.pipelines.kubeflow.org/run_name}}')

You will find that the placeholders will be replaced with the correct run_id and run_name.

For more argo variables: https://github.com/argoproj/argo-workflows/blob/master/docs/variables.md

To Know what are recorded in the labels and annotation in the kubeflow pipeline run, just get the corresponding workflow from k8s.

kubectl get workflow/XXX -oyaml
Jack Lin
  • 56
  • 4
  • 1
    Neither `workflow.labels` nor `workflow.annotations` seem to work for me. When attempting to start a run, I'm getting `Internal error: (...) failed to resolve {{workflow.labels.pipeline/runid}}` and the run doesn't even start. Other variables listed in `variables.md` seem to work fine though. I'm on `kfp==1.8.18`, KF 1.6 and Argo v3.3.8. – nichoio Feb 14 '23 at 12:40
  • same error for me – gebbissimo Mar 16 '23 at 08:29
1

Your component's container should have an environment variable called HOSTNAME that is set to its unique pod name, from which you derive all necessary metadata.

1

create_run_from_pipeline_func which returns RunPipelineResult, and has run_id attribute

client = kfp.Client(host)
result = client.create_run_from_pipeline_func(…) 
result.run_id
Tonechas
  • 13,398
  • 16
  • 46
  • 80
1

For V1:

It seems like kfp.dsl.EXECUTION_ID_PLACEHOLDER (component run id) and kfp.dsl.RUN_ID_PLACEHOLDER (pipeline run id) will accomplish what you're asking.

source

For V2:

kfp.v2.dsl.PIPELINE_JOB_ID_PLACEHOLDER and kfp.v2.dsl.PIPELINE_TASK_ID_PLACEHOLDER respectively.

source

  • Unfortunately for v2 `kfp.v2.dsl.PIPELINE_JOB_ID_PLACEHOLDER` still doesn't work. You still need to use `{{workflow.uid}}` – sumit dugar Aug 31 '23 at 13:44