5

What is a good pattern for aggregating the results from Kubeflow Pipleine kfp.ParallelFor?

Jet Basrawi
  • 3,185
  • 2
  • 15
  • 14

2 Answers2

1

Not exactly what you asked for, but our workaround was to write the results of the parallelfor tasks into S3 and simply collect them afterwards in a postprocessing task.

with dsl.ParallelFor(preprocessing_task.output) as plant_item:
                predict_plant='{}'.format(plant_item)
                forecasting_task = forecasting_op(predict_plant, ....).after(preprocessing_task)
postprocessing_task = postprocessing_op(...).after(forecasting_task)
  • (After multiple suggested edits: no, the postprocessing step is _not_ inside the loop, it is afterwards. That is exactly what collects the results.) – user12154148 Aug 04 '21 at 06:50
  • Are you aware of any documentation supporting that? When I try to recreate this approach, one `postprocessing_task` node appears in the graph for each of my `forecasting_task` equivalents. – cjs Dec 01 '21 at 17:57
  • 1
    Did you put plant_item or predict_plant as input into it? Because then it's understandable. But no, documentation I couldn't find, just trial and error. – user12154148 Dec 03 '21 at 06:29
0

At the moment this might not be supported:

Support inputs with multiple arguments #1933

Peter
  • 141
  • 4