7

I use a ParallelFor over a dynamic list. I want to collect all the outputs from the loop, and pass them to another ContainerOp.
Something like the following, which obviously does not work, since the outputs list is will be static.

with dsl.ParallelFor(op1.output) as item:
    op2 = dsl.ContainerOp(
      name='op2',
      ...
      file_outputs={
         'outputs': '/outputs.json',
    })
    outputs.append(op2.output)


op3 = dsl.ContainerOp(
   name='op3',
   ...
   arguments=['--input': outputs]  # won't work
)
user3599803
  • 6,435
  • 17
  • 69
  • 130

3 Answers3

5

Unfortunately, the solution of Ark-kun is not working for me. But there is a simple way to implement fan-in workflow if we know the number of inputs in advance. We may precalculate pipeline DAG like that:

@kfp.components.create_component_from_func
def my_transformer_op(item: str) -> str:
    return item + "_NEW"


@kfp.components.create_component_from_func
def my_aggregator_op(items: list) -> str:
    return "HELLO"


def pipeline(array_of_arguments):
    @dsl.pipeline(PIPELINE_NAME, PIPELINE_DESCRIPTION)
    def dynamic_pipeline():
        outputs = []
        for i in array_of_arguments:
            outputs.append(my_transformer_op(str(i)).output)
        my_aggregator_op(outputs)
    return dynamic_pipeline

...

    run_id = client.create_run_from_pipeline_func(
        pipeline(data_samples_chunks), {},
        run_name=PIPELINE_RUN,
        experiment_name=PIPELINE_EXPERIMENT).run_id

Pipeline graph

Ezhik
  • 876
  • 6
  • 23
4

I have run into issues with dynamic "fanning-out" and then "fanning-in" with Kubeflow Pipelines as well. Maybe a little heavy-handed but I used a mounted PVC claim to get over this.

Kubeflow allows you to mount a known PVC or create a new one on the fly using VolumeOp (link here). This snippet shows how to use a known PVC.

    pvc_name = '<available-pvc-name>' 
    pvc_volume_name = '<pvc-uuid>' # pass the pvc uuid here

    # Op 1 creates a list to iterate over
    op_1 = dsl.ContainerOp(
            name='echo',
            image='library/bash:4.4.23',
            command=['sh', '-c'],
            arguments=['echo "[1,2,3]"> /tmp/output.txt'],
            file_outputs={'output': '/tmp/output.txt'})

    # Using withParam here to iterate over the results from op1
    # and writing the results of each step to its own PVC
    with dsl.ParallelFor(op_1.output) as item:
        op_2 = dsl.ContainerOp(
            name='iterate',
            image='library/bash:4.4.23',
            command=['sh', '-c'],
            arguments=[f"echo item-{item} > /tmp/output.txt; "  # <- write to output  
                       f"mkdir -p /mnt/{{workflow.uid}}; "  # <- make a dir under /mnt
                       f"echo item-{item}\n >> /mnt/{{workflow.uid}}"],  # <- append results from each step to the PVC
            file_outputs={'output': '/tmp/output.txt'},
            # mount the PVC
            pvolumes={"/mnt": dsl.PipelineVolume(pvc=pvc_name, name=pvc_volume_name)})

    op_3 = dsl.ContainerOp(
            name='echo',
            image='library/bash:4.4.23',
            command=['sh', '-c'],
            arguments=[f"echo /mnt/{{workflow.uid}} > /tmp/output.txt"],
            # mount the PVC again to use
            pvolumes={"/mnt": dsl.PipelineVolume(pvc=pvc_name, name=pvc_volume_name)},
            file_outputs={'output': '/tmp/output_2.txt'}).after(op_2)

Ensure that op_3 runs after the loops from op_2 using after(op_2) in the end.

Note: This might be a heavy-handed approach and there might be better solutions if KFP allows this as part of the KF compiler but I couldn't get it to work. If it's easy to create a PVC in the env this might work for your case.

santiago92
  • 413
  • 2
  • 9
  • I was thinking of writing to shared storage, this does work, but I hoped for a better kubeflow oriented solution, as this force the container to "know" that it is running in a loop – user3599803 Jan 01 '20 at 09:38
  • 1
    Unrelated note: Please use `component.yaml` files instead of manually creating `ContainerOp` instances. The componenents are as easy to create, but are reusable and portable. Thanks. – Ark-kun Jun 19 '20 at 23:56
0

The issue is that op3 is not properly referencing the output from op2 as an input argument. Try this:

op3 = dsl.ContainerOp(
    ...
    arguments=['--input': op2.outputs['outputs']]
)
Ekaba Bisong
  • 2,918
  • 2
  • 23
  • 38
  • Isn't this will reference only that last op2? Since there are multiple of them. Or there's some kubeflow magic here – user3599803 Dec 25 '19 at 08:03
  • Yes, I did see this in the documentation some time ago. I haven't check it recently. However, I wrote a blog on this: https://ekababisong.org/kubeflow-for-poets/ – Ekaba Bisong Dec 25 '19 at 08:45
  • `invalid spec: templates.for-loop-for-loop-1a944bdb-1.outputs failed to resolve {{tasks.test-step-2.outputs.parameters.test-step-2-output}}` – LobsterMan Feb 20 '20 at 11:00