0

I've got a PCollection where each element is a key, values tuple like this: (key, (value1,..,value_n) )

I need to split this PCollection in two processing branches.

As always, I need the whole pipeline to be as fast and use as little ram as possible.

Two ideas come to my mind:

Option 1: Split the PColl with a DoFn with multiple outputs

class SplitInTwo(beam.DoFn):

   def process(self, kvpair):
       key, values = kvpair
       
       yield beam.TaggedOutput('left', (key, values[0:2]))
       yield beam.TaggedOutput('right', (key, values[2:]))

class ProcessLeft(beam.DoFn):
   def process(self, kvpair):
       key,values = kvpair
       ...
       yield (key, results)

# class ProcessRight is similar to ProcessLeft

And then build the pipeline like this

   splitme = pcoll | beam.ParDo(SplitInTwo()).with_outputs('left','right')
   left = splitme.left | beam.ParDo(ProcessLeft())
   right = splitme.right | beam.ParDo(ProcessRight())

Option 2: Use two different DoFn on the original PCollection

Another option is using two DoFns to read and process the same PCollection. Just using one for the 'left' and 'right' hand sides of the data:

class ProcessLeft(beam.DoFn):

   def process(self, kvpair):
       key = kvpair[0]
       values = kvpair[0][0:2]
       ...
       yield (key,result)

# class ProcessRight is similar to ProcessLeft

Building the pipleline is simpler... (plus you don't need to track which tagged outputs you have):

   left = pcoll | beam.ParDo(ProcessLeft())
   right = pcoll| beam.ParDo(ProcessRight())

But... is it faster? will need less memory than the first one?

(I'm thinking about the first option might be fused by the runner - not just a Dataflow runner).

Jason Aller
  • 3,541
  • 28
  • 38
  • 38
Iñigo González
  • 3,735
  • 1
  • 11
  • 27

1 Answers1

3

In this case, both options would be fused by the runner, so both options would be somewhat similar in terms of performance. If you would like to reshuffle data into separate workers, then Option 1 is your best choice, as the serialized collection read by ProcessLeft and ProcessRight would be smaller.

   splitme = pcoll | beam.ParDo(SplitInTwo()).with_outputs('left','right')
   left = splitme.left | beam.Reshuffle() | beam.ParDo(ProcessLeft())
   right = splitme.right | beam.Reshuffle() | beam.ParDo(ProcessRight())

The Reshuffle transform would ensure that your data is written to an intermediate shuffle, and then consumed downstream. This would break the fusion.

Pablo
  • 10,425
  • 1
  • 44
  • 67
  • Wow. I didn't expect both left and right branches being fused in option#1. One of them? Yes. I guess, fusing both into the same node means data stays locally in the same node untill all the elements in the pardo have been consumed. – Iñigo González Dec 04 '20 at 19:59
  • 1
    In general, Beam/Dataflow/others will not break fusion between transforms unless there's a specific need for an aggregation (e.g. side inputs, group by key, reduce, etc). This generally works well, but it runs into issues when Step2/Step3 have a lot more work than Step 1 (see https://stackoverflow.com/questions/54121642/apache-beam-dataflow-reshuffle/54131856#54131856) - so you may not need to explicitly break the fusion via reshuffle unless that's the case. – Pablo Dec 04 '20 at 20:17