2

I want to write a pipeline in Kubeflow pipeline that has 2 components: A and B

The output of A is list of image path.

I want to run a docker image (B) for each image path

From what I see the dsl.ContainerOp of B can wait for output of A, but I don't know how to create multiple instances of B

RunOrVeith
  • 4,487
  • 4
  • 32
  • 50
asaf
  • 958
  • 1
  • 16
  • 38

1 Answers1

2

Update: This has changed recently and can be done simply by using ParallerlFor over the output. Refer: https://stackoverflow.com/a/59292863/4438213

----- Below for KF 0.6 and before ----

This is a recognized issue with Kubeflow DSL: to use the output of one component (A) and iterate over it running a new component (B) for each entry in the previous output. It's hard since the DSL, Kubeflow uses, is at compile time and it's not possible to know how many elements there would be in the output at that time.

Ref:

The only form of dynamic (run-time) iteration supported as of KF v0.6 is: dsl-recursion. I've made it work in 2 ways lacking the pending work on the issues above:

If the size of the result of A is going to be a constant in each run and is pre-known, this is straight forward.

CASE A: The size of the output from the previous step is known

  1. Create a lightweight comp to get the image path at a given index
# Write a python code to extract the path from
# the string of refs the previous step returns 
def get_path(str_of_paths: str, idx: int) -> str:
    return str_of_paths.split(" ")[idx] # or some other delimiter
  1. Wrap the python code in a Kubeflow lightweight components
get_img_path_comp = comp.func_to_container_op(get_path,base_image='tensorflow/tensorflow') # or any appropriate base image

And then a regular for loop in your pipeline dsl code would work

image_path_res = ContainerOP_A() # run your container Op
for idx in range(4):
    path = get_path(image_path_res.output, i)
    ContainerOp_B(path.output)

CASE B: When the output of the previous step is not of fixed size

This is a little tricky and intricate. The only form of dynamic looping Kubeflow allows as of KF v0.6 is dsl-recursion

Option 1

  1. Create 2 lightweight components, one for calculating the size of the result sizer_op and then reuse the same get_img_path_comp from above.
@dsl.component
def sizer_op(str_of_refs) -> int:
    return len(str_of_refs.split("|"))
sizer_op_comp = comp.func_to_container_op(sizer_op,base_image='tensorflow/tensorflow')

Then you can run the recusive function

@dsl.component
def subtracter_op(cur_idx) -> int:
    return cur_idx - 1
sub_op_comp = comp.func_to_container_op(subtracter_op,base_image='tensorflow/tensorflow')

@dsl.graph_component
def recursive_run(list_of_images, cur_ref):
    with dsl.Condition(cur_ref >= 0):
        path = get_path(image_path_res.output, i)
        ContainerOp_B(path.output)

        # call recursively
        next_ref = sub_op_comp(cur_ref)
        recursive_run(list_of_images, next_ref)


image_path_res = ContainerOP_A() # run your container Op
sizer = sizer_op_comp(image_path_res)
recursive_run(image_path_res.output, sizer.output)

Option 2

After running ContainerOp_A, create a Kubeflow Component that reads the results from ContainerOp_A, parses the results in python code itself and then spawns new runs that run just Containerop_B using kfclient. You can connect to KF pipeline client using:

kf_client = Client(host=localhost:9990)

Refer: kf_client

santiago92
  • 413
  • 2
  • 9
  • 1
    `cur_ref - 1`. You cannot do this. There is no agent that will be able to execute that subtraction at runtime. You should get a compile error that you cannot do `PipelineParam - 1` – Ark-kun Jan 09 '20 at 23:18
  • Thanks for catching that @Ark-kun! I forgot to mention I use a Python Lightweight component for this. Also updated the answer for latest KFP versions that can do this with `ParallelFor` – santiago92 Jan 10 '20 at 10:32