1

I have one pipeline read file from GCS through Pub\Sub,

class ExtractFileNameFn(beam.DoFn):
    def process(self, element):
        file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
        logging.info("Load file: " + file_name)
        yield file_name

class LogFn(beam.DoFn):
    def process(self, element):
        logging.info(element)
        return [element]

class LogPassThroughFn(beam.DoFn):
    def process(self, element):
        logging.info(element)
        return element

...

    p
    | "Read Sub Message" >> beam.io.ReadFromPubSub(topic=args.topic)
    | "Convert Message to JSON" >> beam.Map(lambda message: json.loads(message))
    | "Extract File Name" >> beam.ParDo(ExtractFileNameFn())
    | 'Log Results' >> beam.ParDo(LogFn())
    # | 'Log Results' >> beam.ParDo(LogPassThroughFn())
    | "Read File from GCS" >> beam.io.ReadAllFromText()

The difference of LogPassThroughFn and LogPassThroughFn is the type of return value, one the string, the other is list. And the LogFn works well in test codes, but LogPassThroughFn make the pipeline failed to run. Per this issue answer

Beam Python SDK still tries to interpret the output of that ParDo as if it was a collection of elements. And it does so by interpreting the string you emitted as collection of characters.

We know LogFn should work correctly.

However, I notice the ExtractFileNameFn return string rather than list. Is that correct? Then I test it as below, return list in ExtractFileNameFn1

class ExtractFileNameFn1(beam.DoFn):
    def process(self, element):
        file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
        logging.info("Load file: " + file_name)
        yield [file_name]

...

    p
    | "Read Sub Message" >> beam.io.ReadFromPubSub(topic=args.topic)
    | "Convert Message to JSON" >> beam.Map(lambda message: json.loads(message))
    | "Extract File Name" >> beam.ParDo(ExtractFileNameFn1())
    | "Read File from GCS" >> beam.io.ReadAllFromText()

Now, the pipeline failed to run...

My question is What the difference between return string and return list in DoFn? Why ReadAllFromText could receive string from ExtractFileNameFn, but receive list from LogFn?

beam version: 2.14.0

zangw
  • 43,869
  • 19
  • 177
  • 214

1 Answers1

1

The docs for ParDo say that:

Note that the DoFn must return an iterable for each element of the input PCollection. An easy way to do this is to use the yield keyword in the process method.

https://beam.apache.org/releases/pydoc/2.6.0/apache_beam.transforms.core.html#apache_beam.transforms.core.ParDo

The purpose of returning an iterable is that your input elements may not map 1-1 with your output elements. A single input may produce multiple outputs.

you are able to yield them as you go, or you can gather them up into a list and return them at the end

So this:

class ExtractFileNameFn(beam.DoFn):
    def process(self, element):
        file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
        logging.info("Load file: " + file_name)
        yield file_name

would be the same as this:

class ExtractFileNameFn(beam.DoFn):
    def process(self, element):
        file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
        logging.info("Load file: " + file_name)
        return [file_name]

the output elements for both are strings, each output element being a filename

When you do yield [file_name], each output element is actually a list containing a string

Alex
  • 5,141
  • 12
  • 26